From 6cf68aead6886d397772ea176a9f5c35fdb63071 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Thu, 13 Dec 2012 20:23:01 +0100 Subject: [PATCH] deps: upgrade libuv to e079a99 --- deps/uv/.gitignore | 2 - deps/uv/Makefile | 4 +- deps/uv/config-mingw.mk | 4 +- deps/uv/config-unix.mk | 32 +- deps/uv/include/uv-private/uv-darwin.h | 3 + deps/uv/include/uv-private/uv-unix.h | 7 +- deps/uv/include/uv.h | 56 ++- deps/uv/src/unix/async.c | 2 + deps/uv/src/unix/core.c | 25 +- deps/uv/src/unix/fs.c | 10 +- deps/uv/src/unix/getaddrinfo.c | 19 +- deps/uv/src/unix/internal.h | 12 +- deps/uv/src/unix/pipe.c | 17 +- deps/uv/src/unix/process.c | 9 +- deps/uv/src/unix/stream.c | 349 ++++++++++++++++-- deps/uv/src/unix/tcp.c | 50 ++- deps/uv/src/unix/threadpool.c | 145 ++++++-- deps/uv/src/unix/timer.c | 9 +- deps/uv/src/unix/tty.c | 4 +- deps/uv/src/unix/udp.c | 2 +- deps/uv/src/win/core.c | 10 + deps/uv/src/win/error.c | 2 + deps/uv/src/win/process.c | 18 +- deps/uv/src/win/threadpool.c | 10 +- deps/uv/src/win/util.c | 260 ++++++++----- deps/uv/test/test-condvar-consumer-producer.c | 7 +- deps/uv/test/test-embed.c | 132 +++++++ deps/uv/test/test-list.h | 15 + deps/uv/test/test-tcp-read-stop.c | 73 ++++ deps/uv/test/test-threadpool-cancel.c | 266 +++++++++++++ deps/uv/test/test-threadpool.c | 21 +- deps/uv/uv.gyp | 3 + src/node_crypto.cc | 9 +- src/node_zlib.cc | 4 +- 34 files changed, 1340 insertions(+), 251 deletions(-) create mode 100644 deps/uv/test/test-embed.c create mode 100644 deps/uv/test/test-tcp-read-stop.c create mode 100644 deps/uv/test/test-threadpool-cancel.c diff --git a/deps/uv/.gitignore b/deps/uv/.gitignore index 0a4d7dabe8f..29b70ae0afc 100644 --- a/deps/uv/.gitignore +++ b/deps/uv/.gitignore @@ -31,5 +31,3 @@ UpgradeLog*.XML Debug Release ipch -*.mk -*.Makefile diff --git a/deps/uv/Makefile b/deps/uv/Makefile index 5b2ff7f1a3c..4012b6e00da 100644 --- a/deps/uv/Makefile +++ b/deps/uv/Makefile @@ -44,10 +44,10 @@ BENCHMARKS=test/blackhole-server.c test/echo-server.c test/dns-server.c test/ben all: libuv.a test/run-tests$(E): test/run-tests.c test/runner.c $(RUNNER_SRC) $(TESTS) libuv.$(SOEXT) - $(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LINKFLAGS) + $(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LDFLAGS) test/run-benchmarks$(E): test/run-benchmarks.c test/runner.c $(RUNNER_SRC) $(BENCHMARKS) libuv.$(SOEXT) - $(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LINKFLAGS) + $(CC) $(CPPFLAGS) $(RUNNER_CFLAGS) -o $@ $^ $(RUNNER_LIBS) $(RUNNER_LDFLAGS) test/echo.o: test/echo.c test/echo.h diff --git a/deps/uv/config-mingw.mk b/deps/uv/config-mingw.mk index 74e934aa1f4..662831dcaca 100644 --- a/deps/uv/config-mingw.mk +++ b/deps/uv/config-mingw.mk @@ -25,13 +25,13 @@ AR = $(PREFIX)ar E=.exe CFLAGS=$(CPPFLAGS) -g --std=gnu89 -D_WIN32_WINNT=0x0600 -LINKFLAGS=-lm +LDFLAGS=-lm WIN_SRCS=$(wildcard src/win/*.c) WIN_OBJS=$(WIN_SRCS:.c=.o) RUNNER_CFLAGS=$(CFLAGS) -D_GNU_SOURCE # Need _GNU_SOURCE for strdup? -RUNNER_LINKFLAGS=$(LINKFLAGS) +RUNNER_LDFLAGS=$(LDFLAGS) RUNNER_LIBS=-lws2_32 -lpsapi -liphlpapi RUNNER_SRC=test/runner-win.c diff --git a/deps/uv/config-unix.mk b/deps/uv/config-unix.mk index 66c7f3fa4ca..37828f29f63 100644 --- a/deps/uv/config-unix.mk +++ b/deps/uv/config-unix.mk @@ -22,14 +22,14 @@ E= CSTDFLAG=--std=c89 -pedantic -Wall -Wextra -Wno-unused-parameter CFLAGS += -g CPPFLAGS += -Isrc -LINKFLAGS=-lm +LDFLAGS=-lm CPPFLAGS += -D_LARGEFILE_SOURCE CPPFLAGS += -D_FILE_OFFSET_BITS=64 RUNNER_SRC=test/runner-unix.c RUNNER_CFLAGS=$(CFLAGS) -Itest -RUNNER_LINKFLAGS=-L"$(PWD)" -luv -Xlinker -rpath -Xlinker "$(PWD)" +RUNNER_LDFLAGS=-L"$(PWD)" -luv -Xlinker -rpath -Xlinker "$(PWD)" OBJS += src/unix/async.o OBJS += src/unix/core.o @@ -56,21 +56,21 @@ OBJS += src/inet.o ifeq (SunOS,$(uname_S)) CPPFLAGS += -D__EXTENSIONS__ -D_XOPEN_SOURCE=500 -LINKFLAGS+=-lkstat -lnsl -lsendfile -lsocket +LDFLAGS+=-lkstat -lnsl -lsendfile -lsocket # Library dependencies are not transitive. -RUNNER_LINKFLAGS += $(LINKFLAGS) +RUNNER_LDFLAGS += $(LDFLAGS) OBJS += src/unix/sunos.o endif ifeq (AIX,$(uname_S)) CPPFLAGS += -Isrc/ares/config_aix -D_ALL_SOURCE -D_XOPEN_SOURCE=500 -LINKFLAGS+= -lperfstat +LDFLAGS+= -lperfstat OBJS += src/unix/aix.o endif ifeq (Darwin,$(uname_S)) CPPFLAGS += -D_DARWIN_USE_64_BIT_INODE=1 -LINKFLAGS+=-framework CoreServices -dynamiclib -install_name "@rpath/libuv.dylib" +LDFLAGS+=-framework CoreServices -dynamiclib -install_name "@rpath/libuv.dylib" SOEXT = dylib OBJS += src/unix/darwin.o OBJS += src/unix/kqueue.o @@ -79,7 +79,7 @@ endif ifeq (Linux,$(uname_S)) CSTDFLAG += -D_GNU_SOURCE -LINKFLAGS+=-ldl -lrt +LDFLAGS+=-ldl -lrt RUNNER_CFLAGS += -D_GNU_SOURCE OBJS += src/unix/linux/linux-core.o \ src/unix/linux/inotify.o \ @@ -87,25 +87,25 @@ OBJS += src/unix/linux/linux-core.o \ endif ifeq (FreeBSD,$(uname_S)) -LINKFLAGS+=-lkvm +LDFLAGS+=-lkvm OBJS += src/unix/freebsd.o OBJS += src/unix/kqueue.o endif ifeq (DragonFly,$(uname_S)) -LINKFLAGS+=-lkvm +LDFLAGS+=-lkvm OBJS += src/unix/freebsd.o OBJS += src/unix/kqueue.o endif ifeq (NetBSD,$(uname_S)) -LINKFLAGS+=-lkvm +LDFLAGS+=-lkvm OBJS += src/unix/netbsd.o OBJS += src/unix/kqueue.o endif ifeq (OpenBSD,$(uname_S)) -LINKFLAGS+=-lkvm +LDFLAGS+=-lkvm OBJS += src/unix/openbsd.o OBJS += src/unix/kqueue.o endif @@ -113,22 +113,22 @@ endif ifneq (,$(findstring CYGWIN,$(uname_S))) # We drop the --std=c89, it hides CLOCK_MONOTONIC on cygwin CSTDFLAG = -D_GNU_SOURCE -LINKFLAGS+= +LDFLAGS+= OBJS += src/unix/cygwin.o endif ifeq (SunOS,$(uname_S)) -RUNNER_LINKFLAGS += -pthreads +RUNNER_LDFLAGS += -pthreads else -RUNNER_LINKFLAGS += -pthread +RUNNER_LDFLAGS += -pthread endif libuv.a: $(OBJS) $(AR) rcs $@ $^ -libuv.$(SOEXT): CFLAGS += -fPIC +libuv.$(SOEXT): override CFLAGS += -fPIC libuv.$(SOEXT): $(OBJS) - $(CC) -shared -o $@ $^ $(LINKFLAGS) + $(CC) -shared -o $@ $^ $(LDFLAGS) src/%.o: src/%.c include/uv.h include/uv-private/uv-unix.h $(CC) $(CSTDFLAG) $(CPPFLAGS) $(CFLAGS) -c $< -o $@ diff --git a/deps/uv/include/uv-private/uv-darwin.h b/deps/uv/include/uv-private/uv-darwin.h index c4bfd0ff9e9..f11c12aeead 100644 --- a/deps/uv/include/uv-private/uv-darwin.h +++ b/deps/uv/include/uv-private/uv-darwin.h @@ -49,4 +49,7 @@ uv_sem_t cf_sem; \ uv_mutex_t cf_mutex; \ +#define UV_STREAM_PRIVATE_PLATFORM_FIELDS \ + void* select; \ + #endif /* UV_DARWIN_H */ diff --git a/deps/uv/include/uv-private/uv-unix.h b/deps/uv/include/uv-private/uv-unix.h index 683a9c92988..6d2efbe2027 100644 --- a/deps/uv/include/uv-private/uv-unix.h +++ b/deps/uv/include/uv-private/uv-unix.h @@ -60,7 +60,7 @@ struct uv__io_s { struct uv__work { void (*work)(struct uv__work *w); - void (*done)(struct uv__work *w); + void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; ngx_queue_t wq; }; @@ -90,6 +90,10 @@ struct uv__work { # define UV_PLATFORM_FS_EVENT_FIELDS /* empty */ #endif +#ifndef UV_STREAM_PRIVATE_PLATFORM_FIELDS +# define UV_STREAM_PRIVATE_PLATFORM_FIELDS /* empty */ +#endif + /* Note: May be cast to struct iovec. See writev(2). */ typedef struct { char* base; @@ -209,6 +213,7 @@ typedef struct { uv_connection_cb connection_cb; \ int delayed_error; \ int accepted_fd; \ + UV_STREAM_PRIVATE_PLATFORM_FIELDS \ #define UV_TCP_PRIVATE_FIELDS /* empty */ diff --git a/deps/uv/include/uv.h b/deps/uv/include/uv.h index b187d6cb357..57ce8ae5ad2 100644 --- a/deps/uv/include/uv.h +++ b/deps/uv/include/uv.h @@ -261,6 +261,28 @@ UV_EXTERN void uv_unref(uv_handle_t*); UV_EXTERN void uv_update_time(uv_loop_t*); UV_EXTERN int64_t uv_now(uv_loop_t*); +/* + * Get backend file descriptor. Only kqueue, epoll and event ports are + * supported. + * + * This can be used in conjuction with uv_run_once() to poll in one thread and + * run the event loop's event callbacks in another. + * + * Useful for embedding libuv's event loop in another event loop. + * See test/test-embed.c for an example. + * + * Note that embedding a kqueue fd in another kqueue pollset doesn't work on + * all platforms. It's not an error to add the fd but it never generates + * events. + */ +UV_EXTERN int uv_backend_fd(const uv_loop_t*); + +/* + * Get the poll timeout. The return value is in milliseconds, or -1 for no + * timeout. + */ +UV_EXTERN int uv_backend_timeout(const uv_loop_t*); + /* * Should return a buffer that libuv can use to read data into. @@ -308,7 +330,7 @@ typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal); typedef void (*uv_walk_cb)(uv_handle_t* handle, void* arg); typedef void (*uv_fs_cb)(uv_fs_t* req); typedef void (*uv_work_cb)(uv_work_t* req); -typedef void (*uv_after_work_cb)(uv_work_t* req); +typedef void (*uv_after_work_cb)(uv_work_t* req, int status); typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* req, int status, struct addrinfo* res); @@ -1314,7 +1336,13 @@ enum uv_process_flags { * parent's event loop alive unless the parent process calls uv_unref() on * the child's process handle. */ - UV_PROCESS_DETACHED = (1 << 3) + UV_PROCESS_DETACHED = (1 << 3), + /* + * Hide the subprocess console window that would normally be created. This + * option is only meaningful on Windows systems. On unix it is silently + * ignored. + */ + UV_PROCESS_WINDOWS_HIDE = (1 << 4) }; /* @@ -1358,6 +1386,30 @@ struct uv_work_s { UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb); +/* Cancel a pending request. Fails if the request is executing or has finished + * executing. + * + * Returns 0 on success, -1 on error. The loop error code is not touched. + * + * Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is + * currently supported. + * + * Cancelled requests have their callbacks invoked some time in the future. + * It's _not_ safe to free the memory associated with the request until your + * callback is called. + * + * Here is how cancellation is reported to your callback: + * + * - A uv_fs_t request has its req->errorno field set to UV_ECANCELED. + * + * - A uv_work_t or uv_getaddrinfo_t request has its callback invoked with + * status == -1 and uv_last_error(loop).code == UV_ECANCELED. + * + * This function is currently only implemented on UNIX platforms. On Windows, + * it always returns -1. + */ +UV_EXTERN int uv_cancel(uv_req_t* req); + struct uv_cpu_info_s { char* model; diff --git a/deps/uv/src/unix/async.c b/deps/uv/src/unix/async.c index 479bc8e23c8..d45aa5423b2 100644 --- a/deps/uv/src/unix/async.c +++ b/deps/uv/src/unix/async.c @@ -84,6 +84,8 @@ int uv_async_send(uv_async_t* handle) { r = write(handle->loop->async_pipefd[1], "x", 1); while (r == -1 && errno == EINTR); + assert(r == -1 || r == 1); + if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) return uv__set_sys_error(handle->loop, errno); diff --git a/deps/uv/src/unix/core.c b/deps/uv/src/unix/core.c index 06d689ca2f5..b0686ce0ac9 100644 --- a/deps/uv/src/unix/core.c +++ b/deps/uv/src/unix/core.c @@ -248,7 +248,12 @@ void uv_loop_delete(uv_loop_t* loop) { } -static unsigned int uv__poll_timeout(uv_loop_t* loop) { +int uv_backend_fd(const uv_loop_t* loop) { + return loop->backend_fd; +} + + +int uv_backend_timeout(const uv_loop_t* loop) { if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop)) return 0; @@ -268,7 +273,7 @@ static int uv__run(uv_loop_t* loop) { uv__run_idle(loop); uv__run_prepare(loop); uv__run_pending(loop); - uv__io_poll(loop, uv__poll_timeout(loop)); + uv__io_poll(loop, uv_backend_timeout(loop)); uv__run_check(loop); uv__run_closing_handles(loop); return uv__has_active_handles(loop) || uv__has_active_reqs(loop); @@ -325,6 +330,13 @@ int uv__socket(int domain, int type, int protocol) { sockfd = -1; } +#if defined(SO_NOSIGPIPE) + { + int on = 1; + setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)); + } +#endif + out: return sockfd; } @@ -629,9 +641,6 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) { w->pevents &= ~events; if (w->pevents == 0) { - ngx_queue_remove(&w->pending_queue); - ngx_queue_init(&w->pending_queue); - ngx_queue_remove(&w->watcher_queue); ngx_queue_init(&w->watcher_queue); @@ -648,6 +657,12 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) { } +void uv__io_close(uv_loop_t* loop, uv__io_t* w) { + uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT); + ngx_queue_remove(&w->pending_queue); +} + + void uv__io_feed(uv_loop_t* loop, uv__io_t* w) { if (ngx_queue_empty(&w->pending_queue)) ngx_queue_insert_tail(&loop->pending_queue, &w->pending_queue); diff --git a/deps/uv/src/unix/fs.c b/deps/uv/src/unix/fs.c index 1957fc14988..6c6faf53cda 100644 --- a/deps/uv/src/unix/fs.c +++ b/deps/uv/src/unix/fs.c @@ -90,7 +90,7 @@ } \ else { \ uv__fs_work(&(req)->work_req); \ - uv__fs_done(&(req)->work_req); \ + uv__fs_done(&(req)->work_req, 0); \ return (req)->result; \ } \ } \ @@ -516,7 +516,7 @@ static void uv__fs_work(struct uv__work* w) { } -static void uv__fs_done(struct uv__work* w) { +static void uv__fs_done(struct uv__work* w, int status) { uv_fs_t* req; req = container_of(w, uv_fs_t, work_req); @@ -527,6 +527,12 @@ static void uv__fs_done(struct uv__work* w) { uv__set_artificial_error(req->loop, req->errorno); } + if (status == -UV_ECANCELED) { + assert(req->errorno == 0); + req->errorno = UV_ECANCELED; + uv__set_artificial_error(req->loop, UV_ECANCELED); + } + if (req->cb != NULL) req->cb(req); } diff --git a/deps/uv/src/unix/getaddrinfo.c b/deps/uv/src/unix/getaddrinfo.c index d6bc6988127..7f147291dbb 100644 --- a/deps/uv/src/unix/getaddrinfo.c +++ b/deps/uv/src/unix/getaddrinfo.c @@ -37,11 +37,16 @@ static void uv__getaddrinfo_work(struct uv__work* w) { } -static void uv__getaddrinfo_done(struct uv__work* w) { +static void uv__getaddrinfo_done(struct uv__work* w, int status) { uv_getaddrinfo_t* req = container_of(w, uv_getaddrinfo_t, work_req); struct addrinfo *res = req->res; #if __sun - size_t hostlen = strlen(req->hostname); + size_t hostlen; + + if (req->hostname) + hostlen = strlen(req->hostname); + else + hostlen = 0; #endif req->res = NULL; @@ -58,6 +63,10 @@ static void uv__getaddrinfo_done(struct uv__work* w) { else assert(0); + req->hints = NULL; + req->service = NULL; + req->hostname = NULL; + if (req->retcode == 0) { /* OK */ #if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */ @@ -75,6 +84,12 @@ static void uv__getaddrinfo_done(struct uv__work* w) { req->loop->last_err.sys_errno_ = req->retcode; } + if (status == -UV_ECANCELED) { + assert(req->retcode == 0); + req->retcode = UV_ECANCELED; + uv__set_artificial_error(req->loop, UV_ECANCELED); + } + req->cb(req, req->retcode, res); } diff --git a/deps/uv/src/unix/internal.h b/deps/uv/src/unix/internal.h index 60544618224..786897de976 100644 --- a/deps/uv/src/unix/internal.h +++ b/deps/uv/src/unix/internal.h @@ -130,6 +130,7 @@ void uv__make_close_pending(uv_handle_t* handle); void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd); void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events); void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events); +void uv__io_close(uv_loop_t* loop, uv__io_t* w); void uv__io_feed(uv_loop_t* loop, uv__io_t* w); int uv__io_active(const uv__io_t* w, unsigned int events); void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */ @@ -163,7 +164,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); /* timer */ void uv__run_timers(uv_loop_t* loop); -unsigned int uv__next_timeout(uv_loop_t* loop); +int uv__next_timeout(const uv_loop_t* loop); /* signal */ void uv__signal_close(uv_signal_t* handle); @@ -174,7 +175,7 @@ void uv__signal_loop_cleanup(); void uv__work_submit(uv_loop_t* loop, struct uv__work *w, void (*work)(struct uv__work *w), - void (*done)(struct uv__work *w)); + void (*done)(struct uv__work *w, int status)); void uv__work_done(uv_async_t* handle, int status); /* platform specific */ @@ -197,6 +198,13 @@ void uv__timer_close(uv_timer_t* handle); void uv__udp_close(uv_udp_t* handle); void uv__udp_finish_close(uv_udp_t* handle); +#if defined(__APPLE__) +int uv___stream_fd(uv_stream_t* handle); +#define uv__stream_fd(handle) (uv___stream_fd((uv_stream_t*) (handle))) +#else +#define uv__stream_fd(handle) ((handle)->io_watcher.fd) +#endif /* defined(__APPLE__) */ + #ifdef UV__O_NONBLOCK # define UV__F_NONBLOCK UV__O_NONBLOCK #else diff --git a/deps/uv/src/unix/pipe.c b/deps/uv/src/unix/pipe.c index f432f4539a6..b28c8ef9193 100644 --- a/deps/uv/src/unix/pipe.c +++ b/deps/uv/src/unix/pipe.c @@ -57,7 +57,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { bound = 0; /* Already bound? */ - if (handle->io_watcher.fd >= 0) { + if (uv__stream_fd(handle) >= 0) { uv__set_artificial_error(handle->loop, UV_EINVAL); goto out; } @@ -117,13 +117,13 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { saved_errno = errno; status = -1; - if (handle->io_watcher.fd == -1) { + if (uv__stream_fd(handle) == -1) { uv__set_artificial_error(handle->loop, UV_EINVAL); goto out; } - assert(handle->io_watcher.fd >= 0); + assert(uv__stream_fd(handle) >= 0); - if ((status = listen(handle->io_watcher.fd, backlog)) == -1) { + if ((status = listen(uv__stream_fd(handle), backlog)) == -1) { uv__set_sys_error(handle->loop, errno); } else { handle->connection_cb = cb; @@ -172,7 +172,7 @@ void uv_pipe_connect(uv_connect_t* req, int r; saved_errno = errno; - new_sock = (handle->io_watcher.fd == -1); + new_sock = (uv__stream_fd(handle) == -1); err = -1; if (new_sock) @@ -187,7 +187,8 @@ void uv_pipe_connect(uv_connect_t* req, * is either there or not. */ do { - r = connect(handle->io_watcher.fd, (struct sockaddr*)&saddr, sizeof saddr); + r = connect(uv__stream_fd(handle), + (struct sockaddr*)&saddr, sizeof saddr); } while (r == -1 && errno == EINTR); @@ -196,7 +197,7 @@ void uv_pipe_connect(uv_connect_t* req, if (new_sock) if (uv__stream_open((uv_stream_t*)handle, - handle->io_watcher.fd, + uv__stream_fd(handle), UV_STREAM_READABLE | UV_STREAM_WRITABLE)) goto out; @@ -233,7 +234,7 @@ static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events) { assert(pipe->type == UV_NAMED_PIPE); - sockfd = uv__accept(pipe->io_watcher.fd); + sockfd = uv__accept(uv__stream_fd(pipe)); if (sockfd == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { uv__set_sys_error(pipe->loop, errno); diff --git a/deps/uv/src/unix/process.c b/deps/uv/src/unix/process.c index 9ff722c0c1c..c4df81a9740 100644 --- a/deps/uv/src/unix/process.c +++ b/deps/uv/src/unix/process.c @@ -204,7 +204,7 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) { if (container->flags & UV_INHERIT_FD) { fd = container->data.fd; } else { - fd = container->data.stream->io_watcher.fd; + fd = uv__stream_fd(container->data.stream); } if (fd == -1) { @@ -363,10 +363,11 @@ int uv_spawn(uv_loop_t* loop, int i; assert(options.file != NULL); - assert(!(options.flags & ~(UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS | - UV_PROCESS_DETACHED | + assert(!(options.flags & ~(UV_PROCESS_DETACHED | UV_PROCESS_SETGID | - UV_PROCESS_SETUID))); + UV_PROCESS_SETUID | + UV_PROCESS_WINDOWS_HIDE | + UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS))); uv__handle_init(loop, (uv_handle_t*)process, UV_PROCESS); ngx_queue_init(&process->queue); diff --git a/deps/uv/src/unix/stream.c b/deps/uv/src/unix/stream.c index 6af6020af85..a3193a9b072 100644 --- a/deps/uv/src/unix/stream.c +++ b/deps/uv/src/unix/stream.c @@ -34,6 +34,26 @@ #include #include +#if defined(__APPLE__) +# include +# include +# include + +/* Forward declaration */ +typedef struct uv__stream_select_s uv__stream_select_t; + +struct uv__stream_select_s { + uv_stream_t* stream; + uv_thread_t thread; + uv_sem_t sem; + uv_mutex_t mutex; + uv_async_t async; + int events; + int fake_fd; + int int_fd; + int fd; +}; +#endif /* defined(__APPLE__) */ static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); @@ -96,23 +116,231 @@ void uv__stream_init(uv_loop_t* loop, if (loop->emfile_fd == -1) loop->emfile_fd = uv__open_cloexec("/", O_RDONLY); +#if defined(__APPLE__) + stream->select = NULL; +#endif /* defined(__APPLE_) */ + uv__io_init(&stream->io_watcher, uv__stream_io, -1); } -int uv__stream_open(uv_stream_t* stream, int fd, int flags) { - socklen_t yes; +#if defined(__APPLE__) +void uv__stream_osx_select(void* arg) { + uv_stream_t* stream; + uv__stream_select_t* s; + fd_set read; + fd_set write; + fd_set error; + struct timeval timeout; + int events; + int fd; + int r; + int max_fd; + + stream = arg; + s = stream->select; + fd = stream->io_watcher.fd; + + if (fd > s->int_fd) + max_fd = fd; + else + max_fd = s->int_fd; + + while (1) { + /* Terminate on semaphore */ + if (uv_sem_trywait(&s->sem) == 0) + break; + + /* Watch fd using select(2) */ + FD_ZERO(&read); + FD_ZERO(&write); + FD_ZERO(&error); + + if (uv_is_readable(stream)) + FD_SET(fd, &read); + if (uv_is_writable(stream)) + FD_SET(fd, &write); + FD_SET(fd, &error); + FD_SET(s->int_fd, &read); + + timeout.tv_sec = 0; + timeout.tv_usec = 250000; /* 250 ms timeout */ + r = select(max_fd + 1, &read, &write, &error, &timeout); + if (r == -1) { + if (errno == EINTR) + continue; + + /* XXX: Possible?! */ + abort(); + } + + /* Ignore timeouts */ + if (r == 0) + continue; + + /* Handle events */ + events = 0; + if (FD_ISSET(fd, &read)) + events |= UV__POLLIN; + if (FD_ISSET(fd, &write)) + events |= UV__POLLOUT; + if (FD_ISSET(fd, &error)) + events |= UV__POLLERR; + + uv_mutex_lock(&s->mutex); + s->events |= events; + uv_mutex_unlock(&s->mutex); + + if (events != 0) + uv_async_send(&s->async); + } +} + + +void uv__stream_osx_interrupt_select(uv_stream_t* stream) { + /* Notify select() thread about state change */ + uv__stream_select_t* s; + int r; + + s = stream->select; + + /* Interrupt select() loop + * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will + * emit read event on other side + */ + do + r = write(s->fake_fd, "x", 1); + while (r == -1 && errno == EINTR); + + assert(r == 1); +} + + +void uv__stream_osx_select_cb(uv_async_t* handle, int status) { + uv__stream_select_t* s; + uv_stream_t* stream; + int events; + + s = container_of(handle, uv__stream_select_t, async); + stream = s->stream; + + /* Get and reset stream's events */ + uv_mutex_lock(&s->mutex); + events = s->events; + s->events = 0; + uv_mutex_unlock(&s->mutex); + + assert(0 == (events & UV__POLLERR)); + + /* Invoke callback on event-loop */ + if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN)) + uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN); + + if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT)) + uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT); +} + + +void uv__stream_osx_cb_close(uv_handle_t* async) { + uv__stream_select_t* s; + + s = container_of(async, uv__stream_select_t, async); + free(s); +} + +int uv__stream_try_select(uv_stream_t* stream, int fd) { + /* + * kqueue doesn't work with some files from /dev mount on osx. + * select(2) in separate thread for those fds + */ + + struct kevent filter[1]; + struct kevent events[1]; + struct timespec timeout; + uv__stream_select_t* s; + int fds[2]; + int ret; + int kq; + + kq = kqueue(); + if (kq == -1) { + fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno); + return uv__set_sys_error(stream->loop, errno); + } + + EV_SET(&filter[0], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); + + /* Use small timeout, because we only want to capture EINVALs */ + timeout.tv_sec = 0; + timeout.tv_nsec = 1; + + ret = kevent(kq, filter, 1, events, 1, &timeout); + SAVE_ERRNO(close(kq)); + + if (ret == -1) + return uv__set_sys_error(stream->loop, errno); + + if ((events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL) + return 0; + + /* At this point we definitely know that this fd won't work with kqueue */ + s = malloc(sizeof(*s)); + if (s == NULL) + return uv__set_artificial_error(stream->loop, UV_ENOMEM); + + s->fd = fd; + + if (uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb)) { + SAVE_ERRNO(free(s)); + return uv__set_sys_error(stream->loop, errno); + } + + s->async.flags |= UV__HANDLE_INTERNAL; + uv__handle_unref(&s->async); + + if (uv_sem_init(&s->sem, 0)) + goto fatal1; + + if (uv_mutex_init(&s->mutex)) + goto fatal2; + + /* Create fds for io watcher and to interrupt the select() loop. */ + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds)) + goto fatal3; + + s->fake_fd = fds[0]; + s->int_fd = fds[1]; + + if (uv_thread_create(&s->thread, uv__stream_osx_select, stream)) + goto fatal4; + + s->stream = stream; + stream->select = s; + + return 0; + +fatal4: + close(s->fake_fd); + close(s->int_fd); + s->fake_fd = -1; + s->int_fd = -1; +fatal3: + uv_mutex_destroy(&s->mutex); +fatal2: + uv_sem_destroy(&s->sem); +fatal1: + uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); + return uv__set_sys_error(stream->loop, errno); +} +#endif /* defined(__APPLE__) */ + + +int uv__stream_open(uv_stream_t* stream, int fd, int flags) { assert(fd >= 0); stream->flags |= flags; if (stream->type == UV_TCP) { - /* Reuse the port address if applicable. */ - yes = 1; - - if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) - return uv__set_sys_error(stream->loop, errno); - if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) return uv__set_sys_error(stream->loop, errno); @@ -121,6 +349,21 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { return uv__set_sys_error(stream->loop, errno); } +#if defined(__APPLE__) + { + uv__stream_select_t* s; + int r; + + r = uv__stream_try_select(stream, fd); + if (r == -1) + return r; + + s = stream->select; + if (s != NULL) + fd = s->fake_fd; + } +#endif /* defined(__APPLE__) */ + stream->io_watcher.fd = fd; return 0; @@ -239,9 +482,9 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { /* connection_cb can close the server socket while we're * in the loop so check it on each iteration. */ - while (stream->io_watcher.fd != -1) { + while (uv__stream_fd(stream) != -1) { assert(stream->accepted_fd == -1); - fd = uv__accept(stream->io_watcher.fd); + fd = uv__accept(uv__stream_fd(stream)); if (fd == -1) { switch (errno) { @@ -262,7 +505,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { } if (use_emfile_trick) { - SAVE_ERRNO(r = uv__emfile_trick(loop, stream->io_watcher.fd)); + SAVE_ERRNO(r = uv__emfile_trick(loop, uv__stream_fd(stream))); if (r == 0) continue; } @@ -394,7 +637,7 @@ static void uv__drain(uv_stream_t* stream) { stream->shutdown_req = NULL; uv__req_unregister(stream->loop, req); - if (shutdown(stream->io_watcher.fd, SHUT_WR)) { + if (shutdown(uv__stream_fd(stream), SHUT_WR)) { /* Error. Report it. User should call uv_close(). */ uv__set_sys_error(stream->loop, errno); if (req->cb) { @@ -458,7 +701,7 @@ static void uv__write(uv_stream_t* stream) { start: - assert(stream->io_watcher.fd >= 0); + assert(uv__stream_fd(stream) >= 0); /* Get the request at the head of the queue. */ req = uv_write_queue_head(stream); @@ -512,15 +755,15 @@ static void uv__write(uv_stream_t* stream) { } do { - n = sendmsg(stream->io_watcher.fd, &msg, 0); + n = sendmsg(uv__stream_fd(stream), &msg, 0); } while (n == -1 && errno == EINTR); } else { do { if (iovcnt == 1) { - n = write(stream->io_watcher.fd, iov[0].iov_base, iov[0].iov_len); + n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len); } else { - n = writev(stream->io_watcher.fd, iov, iovcnt); + n = writev(uv__stream_fd(stream), iov, iovcnt); } } while (n == -1 && errno == EINTR); @@ -669,11 +912,11 @@ static void uv__read(uv_stream_t* stream) { assert(buf.len > 0); assert(buf.base); - assert(stream->io_watcher.fd >= 0); + assert(uv__stream_fd(stream) >= 0); if (stream->read_cb) { do { - nread = read(stream->io_watcher.fd, buf.base, buf.len); + nread = read(uv__stream_fd(stream), buf.base, buf.len); } while (nread < 0 && errno == EINTR); } else { @@ -689,7 +932,7 @@ static void uv__read(uv_stream_t* stream) { msg.msg_control = (void *) cmsg_space; do { - nread = recvmsg(stream->io_watcher.fd, &msg, 0); + nread = recvmsg(uv__stream_fd(stream), &msg, 0); } while (nread < 0 && errno == EINTR); } @@ -798,7 +1041,7 @@ static void uv__read(uv_stream_t* stream) { int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) && "uv_shutdown (unix) only supports uv_handle_t right now"); - assert(stream->io_watcher.fd >= 0); + assert(uv__stream_fd(stream) >= 0); if (!(stream->flags & UV_STREAM_WRITABLE) || stream->flags & UV_STREAM_SHUT || @@ -837,16 +1080,16 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { } if (events & UV__POLLIN) { - assert(stream->io_watcher.fd >= 0); + assert(uv__stream_fd(stream) >= 0); uv__read(stream); - if (stream->io_watcher.fd == -1) + if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ } if (events & UV__POLLOUT) { - assert(stream->io_watcher.fd >= 0); + assert(uv__stream_fd(stream) >= 0); uv__write(stream); uv__write_callbacks(stream); } @@ -875,8 +1118,12 @@ static void uv__stream_connect(uv_stream_t* stream) { stream->delayed_error = 0; } else { /* Normal situation: we need to get the socket error from the kernel. */ - assert(stream->io_watcher.fd >= 0); - getsockopt(stream->io_watcher.fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); + assert(uv__stream_fd(stream) >= 0); + getsockopt(uv__stream_fd(stream), + SOL_SOCKET, + SO_ERROR, + &error, + &errorsize); } if (error == EINPROGRESS) @@ -906,7 +1153,7 @@ int uv_write2(uv_write_t* req, stream->type == UV_TTY) && "uv_write (unix) does not yet support other types of streams"); - if (stream->io_watcher.fd < 0) { + if (uv__stream_fd(stream) < 0) { uv__set_sys_error(stream->loop, EBADF); return -1; } @@ -989,11 +1236,17 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, */ stream->flags |= UV_STREAM_READING; +#if defined(__APPLE__) + /* Notify select() thread about state change */ + if (stream->select != NULL) + uv__stream_osx_interrupt_select(stream); +#endif /* defined(__APPLE__) */ + /* TODO: try to do the read inline? */ /* TODO: keep track of tcp state. If we've gotten a EOF then we should * not start the IO watcher. */ - assert(stream->io_watcher.fd >= 0); + assert(uv__stream_fd(stream) >= 0); assert(alloc_cb); stream->read_cb = read_cb; @@ -1023,6 +1276,13 @@ int uv_read_stop(uv_stream_t* stream) { uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); uv__handle_stop(stream); stream->flags &= ~UV_STREAM_READING; + +#if defined(__APPLE__) + /* Notify select() thread about state change */ + if (stream->select != NULL) + uv__stream_osx_interrupt_select(stream); +#endif /* defined(__APPLE__) */ + stream->read_cb = NULL; stream->read2_cb = NULL; stream->alloc_cb = NULL; @@ -1040,9 +1300,42 @@ int uv_is_writable(const uv_stream_t* stream) { } +#if defined(__APPLE__) +int uv___stream_fd(uv_stream_t* handle) { + uv__stream_select_t* s; + + s = handle->select; + if (s != NULL) + return s->fd; + + return handle->io_watcher.fd; +} +#endif /* defined(__APPLE__) */ + + void uv__stream_close(uv_stream_t* handle) { +#if defined(__APPLE__) + /* Terminate select loop first */ + if (handle->select != NULL) { + uv__stream_select_t* s; + + s = handle->select; + + uv_sem_post(&s->sem); + uv__stream_osx_interrupt_select(handle); + uv_thread_join(&s->thread); + uv_sem_destroy(&s->sem); + uv_mutex_destroy(&s->mutex); + close(s->fake_fd); + close(s->int_fd); + uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); + + handle->select = NULL; + } +#endif /* defined(__APPLE__) */ + uv_read_stop(handle); - uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT); + uv__io_close(handle->loop, &handle->io_watcher); close(handle->io_watcher.fd); handle->io_watcher.fd = -1; diff --git a/deps/uv/src/unix/tcp.c b/deps/uv/src/unix/tcp.c index 7c86e7089f7..5229369254b 100644 --- a/deps/uv/src/unix/tcp.c +++ b/deps/uv/src/unix/tcp.c @@ -37,7 +37,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) { static int maybe_new_socket(uv_tcp_t* handle, int domain, int flags) { int sockfd; - if (handle->io_watcher.fd != -1) + if (uv__stream_fd(handle) != -1) return 0; sockfd = uv__socket(domain, SOCK_STREAM, 0); @@ -58,29 +58,21 @@ static int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, int addrsize) { - int saved_errno; - int status; - - saved_errno = errno; - status = -1; + int on; if (maybe_new_socket(tcp, domain, UV_STREAM_READABLE|UV_STREAM_WRITABLE)) return -1; - tcp->delayed_error = 0; - if (bind(tcp->io_watcher.fd, addr, addrsize) == -1) { - if (errno == EADDRINUSE) { - tcp->delayed_error = errno; - } else { - uv__set_sys_error(tcp->loop, errno); - goto out; - } - } - status = 0; + on = 1; + if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) + return uv__set_sys_error(tcp->loop, errno); -out: - errno = saved_errno; - return status; + errno = 0; + if (bind(tcp->io_watcher.fd, addr, addrsize) && errno != EADDRINUSE) + return uv__set_sys_error(tcp->loop, errno); + + tcp->delayed_error = errno; + return 0; } @@ -105,7 +97,7 @@ static int uv__connect(uv_connect_t* req, handle->delayed_error = 0; do - r = connect(handle->io_watcher.fd, addr, addrlen); + r = connect(uv__stream_fd(handle), addr, addrlen); while (r == -1 && errno == EINTR); if (r == -1) { @@ -174,7 +166,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name, goto out; } - if (handle->io_watcher.fd < 0) { + if (uv__stream_fd(handle) < 0) { uv__set_sys_error(handle->loop, EINVAL); rv = -1; goto out; @@ -183,7 +175,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name, /* sizeof(socklen_t) != sizeof(int) on some systems. */ socklen = (socklen_t)*namelen; - if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) { + if (getsockname(uv__stream_fd(handle), name, &socklen) == -1) { uv__set_sys_error(handle->loop, errno); rv = -1; } else { @@ -211,7 +203,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name, goto out; } - if (handle->io_watcher.fd < 0) { + if (uv__stream_fd(handle) < 0) { uv__set_sys_error(handle->loop, EINVAL); rv = -1; goto out; @@ -220,7 +212,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name, /* sizeof(socklen_t) != sizeof(int) on some systems. */ socklen = (socklen_t)*namelen; - if (getpeername(handle->io_watcher.fd, name, &socklen) == -1) { + if (getpeername(uv__stream_fd(handle), name, &socklen) == -1) { uv__set_sys_error(handle->loop, errno); rv = -1; } else { @@ -320,8 +312,8 @@ int uv__tcp_keepalive(int fd, int on, unsigned int delay) { int uv_tcp_nodelay(uv_tcp_t* handle, int on) { - if (handle->io_watcher.fd != -1) - if (uv__tcp_nodelay(handle->io_watcher.fd, on)) + if (uv__stream_fd(handle) != -1) + if (uv__tcp_nodelay(uv__stream_fd(handle), on)) return -1; if (on) @@ -334,8 +326,8 @@ int uv_tcp_nodelay(uv_tcp_t* handle, int on) { int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) { - if (handle->io_watcher.fd != -1) - if (uv__tcp_keepalive(handle->io_watcher.fd, on, delay)) + if (uv__stream_fd(handle) != -1) + if (uv__tcp_keepalive(uv__stream_fd(handle), on, delay)) return -1; if (on) @@ -343,7 +335,7 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) { else handle->flags &= ~UV_TCP_KEEPALIVE; - /* TODO Store delay if handle->io_watcher.fd == -1 but don't want to enlarge + /* TODO Store delay if uv__stream_fd(handle) == -1 but don't want to enlarge * uv_tcp_t with an int that's almost never used... */ diff --git a/deps/uv/src/unix/threadpool.c b/deps/uv/src/unix/threadpool.c index c7044a5d5d3..ee428201a5f 100644 --- a/deps/uv/src/unix/threadpool.c +++ b/deps/uv/src/unix/threadpool.c @@ -20,43 +20,48 @@ */ #include "internal.h" +#include -#include -#include - -/* TODO add condvar support to libuv */ -static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -static pthread_once_t once = PTHREAD_ONCE_INIT; -static pthread_t threads[4]; +static uv_once_t once = UV_ONCE_INIT; +static uv_cond_t cond; +static uv_mutex_t mutex; +static uv_thread_t threads[4]; static ngx_queue_t exit_message; -static ngx_queue_t wq = { &wq, &wq }; +static ngx_queue_t wq; static volatile int initialized; -static void* worker(void* arg) { +static void uv__cancelled(struct uv__work* w) { + abort(); +} + + +/* To avoid deadlock with uv_cancel() it's crucial that the worker + * never holds the global mutex and the loop-local mutex at the same time. + */ +static void worker(void* arg) { struct uv__work* w; ngx_queue_t* q; (void) arg; for (;;) { - if (pthread_mutex_lock(&mutex)) - abort(); + uv_mutex_lock(&mutex); while (ngx_queue_empty(&wq)) - if (pthread_cond_wait(&cond, &mutex)) - abort(); + uv_cond_wait(&cond, &mutex); q = ngx_queue_head(&wq); if (q == &exit_message) - pthread_cond_signal(&cond); - else + uv_cond_signal(&cond); + else { ngx_queue_remove(q); + ngx_queue_init(q); /* Signal uv_cancel() that the work req is + executing. */ + } - if (pthread_mutex_unlock(&mutex)) - abort(); + uv_mutex_unlock(&mutex); if (q == &exit_message) break; @@ -65,36 +70,43 @@ static void* worker(void* arg) { w->work(w); uv_mutex_lock(&w->loop->wq_mutex); + w->work = NULL; /* Signal uv_cancel() that the work req is done + executing. */ ngx_queue_insert_tail(&w->loop->wq, &w->wq); - uv_mutex_unlock(&w->loop->wq_mutex); uv_async_send(&w->loop->wq_async); + uv_mutex_unlock(&w->loop->wq_mutex); } - - return NULL; } static void post(ngx_queue_t* q) { - pthread_mutex_lock(&mutex); + uv_mutex_lock(&mutex); ngx_queue_insert_tail(&wq, q); - pthread_cond_signal(&cond); - pthread_mutex_unlock(&mutex); + uv_cond_signal(&cond); + uv_mutex_unlock(&mutex); } static void init_once(void) { unsigned int i; + if (uv_cond_init(&cond)) + abort(); + + if (uv_mutex_init(&mutex)) + abort(); + ngx_queue_init(&wq); for (i = 0; i < ARRAY_SIZE(threads); i++) - if (pthread_create(threads + i, NULL, worker, NULL)) + if (uv_thread_create(threads + i, worker, NULL)) abort(); initialized = 1; } +#if defined(__GNUC__) __attribute__((destructor)) static void cleanup(void) { unsigned int i; @@ -105,18 +117,21 @@ static void cleanup(void) { post(&exit_message); for (i = 0; i < ARRAY_SIZE(threads); i++) - if (pthread_join(threads[i], NULL)) + if (uv_thread_join(threads + i)) abort(); + uv_mutex_destroy(&mutex); + uv_cond_destroy(&cond); initialized = 0; } +#endif void uv__work_submit(uv_loop_t* loop, struct uv__work* w, void (*work)(struct uv__work* w), - void (*done)(struct uv__work* w)) { - pthread_once(&once, init_once); + void (*done)(struct uv__work* w, int status)) { + uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; @@ -124,11 +139,37 @@ void uv__work_submit(uv_loop_t* loop, } +int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { + int cancelled; + + uv_mutex_lock(&mutex); + uv_mutex_lock(&w->loop->wq_mutex); + + cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL; + if (cancelled) + ngx_queue_remove(&w->wq); + + uv_mutex_unlock(&w->loop->wq_mutex); + uv_mutex_unlock(&mutex); + + if (!cancelled) + return -1; + + w->work = uv__cancelled; + uv_mutex_lock(&loop->wq_mutex); + ngx_queue_insert_tail(&loop->wq, &w->wq); + uv_mutex_unlock(&loop->wq_mutex); + + return 0; +} + + void uv__work_done(uv_async_t* handle, int status) { struct uv__work* w; uv_loop_t* loop; ngx_queue_t* q; ngx_queue_t wq; + int err; loop = container_of(handle, uv_loop_t, wq_async); ngx_queue_init(&wq); @@ -145,7 +186,8 @@ void uv__work_done(uv_async_t* handle, int status) { ngx_queue_remove(q); w = container_of(q, struct uv__work, wq); - w->done(w); + err = (w->work == uv__cancelled) ? -UV_ECANCELED : 0; + w->done(w, err); } } @@ -153,18 +195,23 @@ void uv__work_done(uv_async_t* handle, int status) { static void uv__queue_work(struct uv__work* w) { uv_work_t* req = container_of(w, uv_work_t, work_req); - if (req->work_cb) - req->work_cb(req); + req->work_cb(req); } -static void uv__queue_done(struct uv__work* w) { - uv_work_t* req = container_of(w, uv_work_t, work_req); +static void uv__queue_done(struct uv__work* w, int status) { + uv_work_t* req; + req = container_of(w, uv_work_t, work_req); uv__req_unregister(req->loop, req); - if (req->after_work_cb) - req->after_work_cb(req); + if (req->after_work_cb == NULL) + return; + + if (status == -UV_ECANCELED) + uv__set_artificial_error(req->loop, UV_ECANCELED); + + req->after_work_cb(req, status ? -1 : 0); } @@ -172,6 +219,9 @@ int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb) { + if (work_cb == NULL) + return uv__set_artificial_error(loop, UV_EINVAL); + uv__req_init(loop, req, UV_WORK); req->loop = loop; req->work_cb = work_cb; @@ -179,3 +229,28 @@ int uv_queue_work(uv_loop_t* loop, uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done); return 0; } + + +int uv_cancel(uv_req_t* req) { + struct uv__work* wreq; + uv_loop_t* loop; + + switch (req->type) { + case UV_FS: + loop = ((uv_fs_t*) req)->loop; + wreq = &((uv_fs_t*) req)->work_req; + break; + case UV_GETADDRINFO: + loop = ((uv_getaddrinfo_t*) req)->loop; + wreq = &((uv_getaddrinfo_t*) req)->work_req; + break; + case UV_WORK: + loop = ((uv_work_t*) req)->loop; + wreq = &((uv_work_t*) req)->work_req; + break; + default: + return -1; + } + + return uv__work_cancel(loop, req, wreq); +} diff --git a/deps/uv/src/unix/timer.c b/deps/uv/src/unix/timer.c index a560584deac..9708dbd89b2 100644 --- a/deps/uv/src/unix/timer.c +++ b/deps/uv/src/unix/timer.c @@ -102,13 +102,14 @@ int64_t uv_timer_get_repeat(uv_timer_t* handle) { } -unsigned int uv__next_timeout(uv_loop_t* loop) { - uv_timer_t* handle; +int uv__next_timeout(const uv_loop_t* loop) { + const uv_timer_t* handle; - handle = RB_MIN(uv__timers, &loop->timer_handles); + /* RB_MIN expects a non-const tree root. That's okay, it doesn't modify it. */ + handle = RB_MIN(uv__timers, (struct uv__timers*) &loop->timer_handles); if (handle == NULL) - return (unsigned int) -1; /* block indefinitely */ + return -1; /* block indefinitely */ if (handle->timeout <= loop->time) return 0; diff --git a/deps/uv/src/unix/tty.c b/deps/uv/src/unix/tty.c index 5c1e3609af4..faf94716cc3 100644 --- a/deps/uv/src/unix/tty.c +++ b/deps/uv/src/unix/tty.c @@ -54,7 +54,7 @@ int uv_tty_set_mode(uv_tty_t* tty, int mode) { struct termios raw; int fd; - fd = tty->io_watcher.fd; + fd = uv__stream_fd(tty); if (mode && tty->mode == 0) { /* on */ @@ -105,7 +105,7 @@ int uv_tty_set_mode(uv_tty_t* tty, int mode) { int uv_tty_get_winsize(uv_tty_t* tty, int* width, int* height) { struct winsize ws; - if (ioctl(tty->io_watcher.fd, TIOCGWINSZ, &ws) < 0) { + if (ioctl(uv__stream_fd(tty), TIOCGWINSZ, &ws) < 0) { uv__set_sys_error(tty->loop, errno); return -1; } diff --git a/deps/uv/src/unix/udp.c b/deps/uv/src/unix/udp.c index 89b83b3924d..7a970ba0ea6 100644 --- a/deps/uv/src/unix/udp.c +++ b/deps/uv/src/unix/udp.c @@ -40,7 +40,7 @@ static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], void uv__udp_close(uv_udp_t* handle) { - uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLIN | UV__POLLOUT); + uv__io_close(handle->loop, &handle->io_watcher); uv__handle_stop(handle); close(handle->io_watcher.fd); handle->io_watcher.fd = -1; diff --git a/deps/uv/src/win/core.c b/deps/uv/src/win/core.c index 509ea563f16..3df3399bc13 100644 --- a/deps/uv/src/win/core.c +++ b/deps/uv/src/win/core.c @@ -171,6 +171,16 @@ void uv_loop_delete(uv_loop_t* loop) { } +int uv_backend_fd(const uv_loop_t* loop) { + return -1; +} + + +int uv_backend_timeout(const uv_loop_t* loop) { + return 0; +} + + static void uv_poll(uv_loop_t* loop, int block) { BOOL success; DWORD bytes, timeout; diff --git a/deps/uv/src/win/error.c b/deps/uv/src/win/error.c index fbfc78eaa20..ff79f9c814f 100644 --- a/deps/uv/src/win/error.c +++ b/deps/uv/src/win/error.c @@ -109,6 +109,7 @@ uv_err_code uv_translate_sys_error(int sys_errno) { case WSAECONNRESET: return UV_ECONNRESET; case ERROR_ALREADY_EXISTS: return UV_EEXIST; case ERROR_FILE_EXISTS: return UV_EEXIST; + case ERROR_BUFFER_OVERFLOW: return UV_EFAULT; case WSAEFAULT: return UV_EFAULT; case ERROR_HOST_UNREACHABLE: return UV_EHOSTUNREACH; case WSAEHOSTUNREACH: return UV_EHOSTUNREACH; @@ -125,6 +126,7 @@ uv_err_code uv_translate_sys_error(int sys_errno) { case ERROR_NETWORK_UNREACHABLE: return UV_ENETUNREACH; case WSAENETUNREACH: return UV_ENETUNREACH; case WSAENOBUFS: return UV_ENOBUFS; + case ERROR_NOT_ENOUGH_MEMORY: return UV_ENOMEM; case ERROR_OUTOFMEMORY: return UV_ENOMEM; case ERROR_CANNOT_MAKE: return UV_ENOSPC; case ERROR_DISK_FULL: return UV_ENOSPC; diff --git a/deps/uv/src/win/process.c b/deps/uv/src/win/process.c index 8d22e742d5b..c5649d3ae04 100644 --- a/deps/uv/src/win/process.c +++ b/deps/uv/src/win/process.c @@ -777,10 +777,11 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, } assert(options.file != NULL); - assert(!(options.flags & ~(UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS | - UV_PROCESS_DETACHED | + assert(!(options.flags & ~(UV_PROCESS_DETACHED | UV_PROCESS_SETGID | - UV_PROCESS_SETUID))); + UV_PROCESS_SETUID | + UV_PROCESS_WINDOWS_HIDE | + UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS))); uv_process_init(loop, process); process->exit_cb = options.exit_cb; @@ -872,13 +873,22 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, startup.lpReserved = NULL; startup.lpDesktop = NULL; startup.lpTitle = NULL; - startup.dwFlags = STARTF_USESTDHANDLES; + startup.dwFlags = STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW; + startup.cbReserved2 = uv__stdio_size(process->child_stdio_buffer); startup.lpReserved2 = (BYTE*) process->child_stdio_buffer; + startup.hStdInput = uv__stdio_handle(process->child_stdio_buffer, 0); startup.hStdOutput = uv__stdio_handle(process->child_stdio_buffer, 1); startup.hStdError = uv__stdio_handle(process->child_stdio_buffer, 2); + if (options.flags & UV_PROCESS_WINDOWS_HIDE) { + /* Use SW_HIDE to avoid any potential process window. */ + startup.wShowWindow = SW_HIDE; + } else { + startup.wShowWindow = SW_SHOWDEFAULT; + } + process_flags = CREATE_UNICODE_ENVIRONMENT; if (options.flags & UV_PROCESS_DETACHED) { process_flags |= DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP; diff --git a/deps/uv/src/win/threadpool.c b/deps/uv/src/win/threadpool.c index c1a71c18e0d..1446878cce7 100644 --- a/deps/uv/src/win/threadpool.c +++ b/deps/uv/src/win/threadpool.c @@ -55,6 +55,9 @@ static DWORD WINAPI uv_work_thread_proc(void* parameter) { int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb) { + if (work_cb == NULL) + return uv__set_artificial_error(loop, UV_EINVAL); + uv_work_req_init(loop, req, work_cb, after_work_cb); if (!QueueUserWorkItem(&uv_work_thread_proc, req, WT_EXECUTELONGFUNCTION)) { @@ -67,8 +70,13 @@ int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, } +int uv_cancel(uv_req_t* req) { + return -1; +} + + void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) { uv__req_unregister(loop, req); if(req->after_work_cb) - req->after_work_cb(req); + req->after_work_cb(req, 0); } diff --git a/deps/uv/src/win/util.c b/deps/uv/src/win/util.c index e1a35045358..96b1abe52d3 100644 --- a/deps/uv/src/win/util.c +++ b/deps/uv/src/win/util.c @@ -740,109 +740,201 @@ void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count) { } -uv_err_t uv_interface_addresses(uv_interface_address_t** addresses, - int* count) { - unsigned long size = 0; - IP_ADAPTER_ADDRESSES* adapter_addresses; - IP_ADAPTER_ADDRESSES* adapter_address; - uv_interface_address_t* address; - struct sockaddr* sock_addr; - int length; - char* name; - /* Use IP_ADAPTER_UNICAST_ADDRESS_XP to retain backwards compatibility */ - /* with Windows XP */ - IP_ADAPTER_UNICAST_ADDRESS_XP* unicast_address; +uv_err_t uv_interface_addresses(uv_interface_address_t** addresses_ptr, + int* count_ptr) { + IP_ADAPTER_ADDRESSES* win_address_buf; + ULONG win_address_buf_size; + IP_ADAPTER_ADDRESSES* win_address; - if (GetAdaptersAddresses(AF_UNSPEC, 0, NULL, NULL, &size) - != ERROR_BUFFER_OVERFLOW) { - return uv__new_sys_error(GetLastError()); - } + uv_interface_address_t* uv_address_buf; + char* name_buf; + size_t uv_address_buf_size; + uv_interface_address_t* uv_address; - adapter_addresses = (IP_ADAPTER_ADDRESSES*)malloc(size); - if (!adapter_addresses) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } + int count; - if (GetAdaptersAddresses(AF_UNSPEC, 0, NULL, adapter_addresses, &size) - != ERROR_SUCCESS) { - return uv__new_sys_error(GetLastError()); - } + /* Fetch the size of the adapters reported by windows, and then get the */ + /* list itself. */ + win_address_buf_size = 0; + win_address_buf = NULL; - /* Count the number of interfaces */ - *count = 0; + for (;;) { + ULONG r; - for (adapter_address = adapter_addresses; - adapter_address != NULL; - adapter_address = adapter_address->Next) { + /* If win_address_buf is 0, then GetAdaptersAddresses will fail with */ + /* ERROR_BUFFER_OVERFLOW, and the required buffer size will be stored in */ + /* win_address_buf_size. */ + r = GetAdaptersAddresses(AF_UNSPEC, + 0, + NULL, + win_address_buf, + &win_address_buf_size); - if (adapter_address->OperStatus != IfOperStatusUp) - continue; + if (r == ERROR_SUCCESS) + break; + + free(win_address_buf); + + switch (r) { + case ERROR_BUFFER_OVERFLOW: + /* This happens when win_address_buf is NULL or too small to hold */ + /* all adapters. */ + win_address_buf = malloc(win_address_buf_size); + if (win_address_buf == NULL) + return uv__new_artificial_error(UV_ENOMEM); + + continue; - unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*) - adapter_address->FirstUnicastAddress; + case ERROR_NO_DATA: { + /* No adapters were found. */ + uv_address_buf = malloc(1); + if (uv_address_buf == NULL) + return uv__new_artificial_error(UV_ENOMEM); - while (unicast_address) { - (*count)++; - unicast_address = unicast_address->Next; + *count_ptr = 0; + *addresses_ptr = uv_address_buf; + + return uv_ok_; + } + + case ERROR_ADDRESS_NOT_ASSOCIATED: + return uv__new_artificial_error(UV_EAGAIN); + + case ERROR_INVALID_PARAMETER: + /* MSDN says: + * "This error is returned for any of the following conditions: the + * SizePointer parameter is NULL, the Address parameter is not + * AF_INET, AF_INET6, or AF_UNSPEC, or the address information for + * the parameters requested is greater than ULONG_MAX." + * Since the first two conditions are not met, it must be that the + * adapter data is too big. + */ + return uv__new_artificial_error(UV_ENOBUFS); + + default: + /* Other (unspecified) errors can happen, but we don't have any */ + /* special meaning for them. */ + assert(r != ERROR_SUCCESS); + return uv__new_sys_error(r); } } - *addresses = (uv_interface_address_t*) - malloc(*count * sizeof(uv_interface_address_t)); - if (!(*addresses)) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + /* Count the number of enabled interfaces and compute how much space is */ + /* needed to store their info. */ + count = 0; + uv_address_buf_size = 0; + + for (win_address = win_address_buf; + win_address != NULL; + win_address = win_address->Next) { + /* Use IP_ADAPTER_UNICAST_ADDRESS_XP to retain backwards compatibility */ + /* with Windows XP */ + IP_ADAPTER_UNICAST_ADDRESS_XP* unicast_address; + int name_size; + + /* Interfaces that are not 'up' should not be reported. Also skip */ + /* interfaces that have no associated unicast address, as to avoid */ + /* allocating space for the name for this interface. */ + if (win_address->OperStatus != IfOperStatusUp || + win_address->FirstUnicastAddress == NULL) + continue; + + /* Compute the size of the interface name. */ + name_size = WideCharToMultiByte(CP_UTF8, + 0, + win_address->FriendlyName, + -1, + NULL, + 0, + NULL, + FALSE); + if (name_size <= 0) { + free(win_address_buf); + return uv__new_sys_error(GetLastError()); + } + uv_address_buf_size += name_size; + + /* Count the number of addresses associated with this interface, and */ + /* compute the size. */ + for (unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*) + win_address->FirstUnicastAddress; + unicast_address != NULL; + unicast_address = unicast_address->Next) { + count++; + uv_address_buf_size += sizeof(uv_interface_address_t); + } } - address = *addresses; + /* Allocate space to store interface data plus adapter names. */ + uv_address_buf = malloc(uv_address_buf_size); + if (uv_address_buf == NULL) { + free(win_address_buf); + return uv__new_artificial_error(UV_ENOMEM); + } - for (adapter_address = adapter_addresses; - adapter_address != NULL; - adapter_address = adapter_address->Next) { + /* Compute the start of the uv_interface_address_t array, and the place in */ + /* the buffer where the interface names will be stored. */ + uv_address = uv_address_buf; + name_buf = (char*) (uv_address_buf + count); - if (adapter_address->OperStatus != IfOperStatusUp) + /* Fill out the output buffer. */ + for (win_address = win_address_buf; + win_address != NULL; + win_address = win_address->Next) { + IP_ADAPTER_UNICAST_ADDRESS_XP* unicast_address; + int name_size; + size_t max_name_size; + + if (win_address->OperStatus != IfOperStatusUp || + win_address->FirstUnicastAddress == NULL) continue; - name = NULL; - unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*) - adapter_address->FirstUnicastAddress; + /* Convert the interface name to UTF8. */ + max_name_size = (char*) uv_address_buf + uv_address_buf_size - name_buf; + if (max_name_size > (size_t) INT_MAX) + max_name_size = INT_MAX; + name_size = WideCharToMultiByte(CP_UTF8, + 0, + win_address->FriendlyName, + -1, + name_buf, + (int) max_name_size, + NULL, + FALSE); + if (name_size <= 0) { + free(win_address_buf); + free(uv_address_buf); + return uv__new_sys_error(GetLastError()); + } - while (unicast_address) { - sock_addr = unicast_address->Address.lpSockaddr; - if (sock_addr->sa_family == AF_INET6) { - address->address.address6 = *((struct sockaddr_in6 *)sock_addr); - } else { - address->address.address4 = *((struct sockaddr_in *)sock_addr); - } + /* Add an uv_interface_address_t element for every unicast address. */ + for (unicast_address = (IP_ADAPTER_UNICAST_ADDRESS_XP*) + win_address->FirstUnicastAddress; + unicast_address != NULL; + unicast_address = unicast_address->Next) { + struct sockaddr* sa; - address->is_internal = - adapter_address->IfType == IF_TYPE_SOFTWARE_LOOPBACK ? 1 : 0; - - if (!name) { - /* Convert FriendlyName to utf8 */ - length = uv_utf16_to_utf8(adapter_address->FriendlyName, -1, NULL, 0); - if (length) { - name = (char*)malloc(length); - if (!name) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - - if (!uv_utf16_to_utf8(adapter_address->FriendlyName, -1, name, - length)) { - free(name); - name = NULL; - } - } - } + uv_address->name = name_buf; + + sa = unicast_address->Address.lpSockaddr; + if (sa->sa_family == AF_INET6) + uv_address->address.address6 = *((struct sockaddr_in6 *) sa); + else + uv_address->address.address4 = *((struct sockaddr_in *) sa); - assert(name); - address->name = name; + uv_address->is_internal = + (win_address->IfType == IF_TYPE_SOFTWARE_LOOPBACK); - unicast_address = unicast_address->Next; - address++; + uv_address++; } + + name_buf += name_size; } - free(adapter_addresses); + free(win_address_buf); + + *addresses_ptr = uv_address_buf; + *count_ptr = count; return uv_ok_; } @@ -850,15 +942,5 @@ uv_err_t uv_interface_addresses(uv_interface_address_t** addresses, void uv_free_interface_addresses(uv_interface_address_t* addresses, int count) { - int i; - char* freed_name = NULL; - - for (i = 0; i < count; i++) { - if (freed_name != addresses[i].name) { - freed_name = addresses[i].name; - free(freed_name); - } - } - free(addresses); } diff --git a/deps/uv/test/test-condvar-consumer-producer.c b/deps/uv/test/test-condvar-consumer-producer.c index b2e8d3d9d93..a7bd5a657b1 100644 --- a/deps/uv/test/test-condvar-consumer-producer.c +++ b/deps/uv/test/test-condvar-consumer-producer.c @@ -82,9 +82,6 @@ static void producer(void* arg) { uv_cond_signal(&full); uv_mutex_unlock(&mutex); } - - LOGF("finished_consumers: %d\n", finished_consumers); - ASSERT(finished_consumers == MAX_CONSUMERS); } @@ -129,6 +126,10 @@ TEST_IMPL(consumer_producer) { } ASSERT(0 == uv_thread_join(&pthread)); + + LOGF("finished_consumers: %d\n", finished_consumers); + ASSERT(finished_consumers == MAX_CONSUMERS); + uv_cond_destroy(&empty); uv_cond_destroy(&full); uv_mutex_destroy(&mutex); diff --git a/deps/uv/test/test-embed.c b/deps/uv/test/test-embed.c new file mode 100644 index 00000000000..e635596e7ab --- /dev/null +++ b/deps/uv/test/test-embed.c @@ -0,0 +1,132 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" +#include +#include +#include + +#ifndef HAVE_KQUEUE +# if __APPLE__ || __DragonFly__ || __FreeBSD__ || __OpenBSD__ || __NetBSD__ +# define HAVE_KQUEUE 1 +# endif +#endif + +#ifndef HAVE_EPOLL +# if defined(__linux__) +# define HAVE_EPOLL 1 +# endif +#endif + +#if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL) + +#if defined(HAVE_KQUEUE) +# include +# include +# include +#endif + +#if defined(HAVE_EPOLL) +# include +#endif + +static uv_thread_t embed_thread; +static uv_sem_t embed_sem; +static uv_timer_t embed_timer; +static uv_async_t embed_async; +static volatile int embed_closed; + +static int embed_timer_called; + + +static void embed_thread_runner(void* arg) { + int r; + int fd; + int timeout; + + while (!embed_closed) { + fd = uv_backend_fd(uv_default_loop()); + timeout = uv_backend_timeout(uv_default_loop()); + + do { +#if defined(HAVE_KQUEUE) + struct timespec ts; + ts.tv_sec = timeout / 1000; + ts.tv_nsec = (timeout % 1000) * 1000000; + r = kevent(fd, NULL, 0, NULL, 0, &ts); +#elif defined(HAVE_EPOLL) + r = epoll_wait(fd, NULL, 0, timeout); +#endif + } while (r == -1 && errno == EINTR); + uv_async_send(&embed_async); + uv_sem_wait(&embed_sem); + } +} + + +static void embed_cb(uv_async_t* async, int status) { + uv_run_once(uv_default_loop()); + + uv_sem_post(&embed_sem); +} + + +static void embed_timer_cb(uv_timer_t* timer, int status) { + embed_timer_called++; + embed_closed = 1; + + uv_close((uv_handle_t*) &embed_async, NULL); +} +#endif + + +TEST_IMPL(embed) { +#if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL) + uv_loop_t* external; + + external = uv_loop_new(); + ASSERT(external != NULL); + + embed_timer_called = 0; + embed_closed = 0; + + uv_async_init(external, &embed_async, embed_cb); + + /* Start timer in default loop */ + uv_timer_init(uv_default_loop(), &embed_timer); + uv_timer_start(&embed_timer, embed_timer_cb, 250, 0); + + /* Start worker that will interrupt external loop */ + uv_sem_init(&embed_sem, 0); + uv_thread_create(&embed_thread, embed_thread_runner, NULL); + + /* But run external loop */ + uv_run(external); + + uv_thread_join(&embed_thread); + uv_loop_delete(external); + + ASSERT(embed_timer_called == 1); +#endif + + return 0; +} diff --git a/deps/uv/test/test-list.h b/deps/uv/test/test-list.h index ffa68360aee..82a96a03239 100644 --- a/deps/uv/test/test-list.h +++ b/deps/uv/test/test-list.h @@ -67,6 +67,7 @@ TEST_DECLARE (tcp_flags) TEST_DECLARE (tcp_write_error) TEST_DECLARE (tcp_write_to_half_open_connection) TEST_DECLARE (tcp_unexpected_read) +TEST_DECLARE (tcp_read_stop) TEST_DECLARE (tcp_bind6_error_addrinuse) TEST_DECLARE (tcp_bind6_error_addrnotavail) TEST_DECLARE (tcp_bind6_error_fault) @@ -124,6 +125,7 @@ TEST_DECLARE (pipe_ref3) TEST_DECLARE (pipe_ref4) TEST_DECLARE (process_ref) TEST_DECLARE (active) +TEST_DECLARE (embed) TEST_DECLARE (async) TEST_DECLARE (get_currentexe) TEST_DECLARE (process_title) @@ -184,7 +186,11 @@ TEST_DECLARE (fs_readdir_file) TEST_DECLARE (fs_open_dir) TEST_DECLARE (fs_rename_to_existing_file) TEST_DECLARE (threadpool_queue_work_simple) +TEST_DECLARE (threadpool_queue_work_einval) TEST_DECLARE (threadpool_multiple_event_loops) +TEST_DECLARE (threadpool_cancel_getaddrinfo) +TEST_DECLARE (threadpool_cancel_work) +TEST_DECLARE (threadpool_cancel_fs) TEST_DECLARE (thread_mutex) TEST_DECLARE (thread_rwlock) TEST_DECLARE (thread_create) @@ -284,6 +290,9 @@ TASK_LIST_START TEST_ENTRY (tcp_write_to_half_open_connection) TEST_ENTRY (tcp_unexpected_read) + TEST_ENTRY (tcp_read_stop) + TEST_HELPER (tcp_read_stop, tcp4_echo_server) + TEST_ENTRY (tcp_bind6_error_addrinuse) TEST_ENTRY (tcp_bind6_error_addrnotavail) TEST_ENTRY (tcp_bind6_error_fault) @@ -362,6 +371,8 @@ TASK_LIST_START TEST_ENTRY (active) + TEST_ENTRY (embed) + TEST_ENTRY (async) TEST_ENTRY (get_currentexe) @@ -448,7 +459,11 @@ TASK_LIST_START TEST_ENTRY (fs_open_dir) TEST_ENTRY (fs_rename_to_existing_file) TEST_ENTRY (threadpool_queue_work_simple) + TEST_ENTRY (threadpool_queue_work_einval) TEST_ENTRY (threadpool_multiple_event_loops) + TEST_ENTRY (threadpool_cancel_getaddrinfo) + TEST_ENTRY (threadpool_cancel_work) + TEST_ENTRY (threadpool_cancel_fs) TEST_ENTRY (thread_mutex) TEST_ENTRY (thread_rwlock) TEST_ENTRY (thread_create) diff --git a/deps/uv/test/test-tcp-read-stop.c b/deps/uv/test/test-tcp-read-stop.c new file mode 100644 index 00000000000..9ed30eed61c --- /dev/null +++ b/deps/uv/test/test-tcp-read-stop.c @@ -0,0 +1,73 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +static uv_timer_t timer_handle; +static uv_tcp_t tcp_handle; +static uv_write_t write_req; + + +static void fail_cb(void) { + ASSERT(0 && "fail_cb called"); +} + + +static void write_cb(uv_write_t* req, int status) { + uv_close((uv_handle_t*) &timer_handle, NULL); + uv_close((uv_handle_t*) &tcp_handle, NULL); +} + + +static void timer_cb(uv_timer_t* handle, int status) { + uv_buf_t buf = uv_buf_init("PING", 4); + ASSERT(0 == uv_write(&write_req, + (uv_stream_t*) &tcp_handle, + &buf, + 1, + write_cb)); + ASSERT(0 == uv_read_stop((uv_stream_t*) &tcp_handle)); +} + + +static void connect_cb(uv_connect_t* req, int status) { + ASSERT(0 == status); + ASSERT(0 == uv_timer_start(&timer_handle, timer_cb, 50, 0)); + ASSERT(0 == uv_read_start((uv_stream_t*) &tcp_handle, + (uv_alloc_cb) fail_cb, + (uv_read_cb) fail_cb)); +} + + +TEST_IMPL(tcp_read_stop) { + uv_connect_t connect_req; + struct sockaddr_in addr; + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + ASSERT(0 == uv_timer_init(uv_default_loop(), &timer_handle)); + ASSERT(0 == uv_tcp_init(uv_default_loop(), &tcp_handle)); + ASSERT(0 == uv_tcp_connect(&connect_req, &tcp_handle, addr, connect_cb)); + ASSERT(0 == uv_run(uv_default_loop())); + MAKE_VALGRIND_HAPPY(); + + return 0; +} diff --git a/deps/uv/test/test-threadpool-cancel.c b/deps/uv/test/test-threadpool-cancel.c new file mode 100644 index 00000000000..db0397afe06 --- /dev/null +++ b/deps/uv/test/test-threadpool-cancel.c @@ -0,0 +1,266 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#define INIT_CANCEL_INFO(ci, what) \ + do { \ + (ci)->reqs = (what); \ + (ci)->nreqs = ARRAY_SIZE(what); \ + (ci)->stride = sizeof((what)[0]); \ + } \ + while (0) + +struct cancel_info { + void* reqs; + unsigned nreqs; + unsigned stride; + uv_timer_t timer_handle; +}; + +static uv_cond_t signal_cond; +static uv_mutex_t signal_mutex; +static uv_mutex_t wait_mutex; +static unsigned num_threads; +static unsigned fs_cb_called; +static unsigned work_cb_called; +static unsigned done_cb_called; +static unsigned done2_cb_called; +static unsigned timer_cb_called; +static unsigned getaddrinfo_cb_called; + + +static void work_cb(uv_work_t* req) { + uv_mutex_lock(&signal_mutex); + uv_cond_signal(&signal_cond); + uv_mutex_unlock(&signal_mutex); + + uv_mutex_lock(&wait_mutex); + uv_mutex_unlock(&wait_mutex); + + work_cb_called++; +} + + +static void done_cb(uv_work_t* req, int status) { + done_cb_called++; + free(req); +} + + +static void saturate_threadpool(void) { + uv_work_t* req; + + ASSERT(0 == uv_cond_init(&signal_cond)); + ASSERT(0 == uv_mutex_init(&signal_mutex)); + ASSERT(0 == uv_mutex_init(&wait_mutex)); + + uv_mutex_lock(&signal_mutex); + uv_mutex_lock(&wait_mutex); + + for (num_threads = 0; /* empty */; num_threads++) { + req = malloc(sizeof(*req)); + ASSERT(req != NULL); + ASSERT(0 == uv_queue_work(uv_default_loop(), req, work_cb, done_cb)); + + /* Expect to get signalled within 350 ms, otherwise assume that + * the thread pool is saturated. As with any timing dependent test, + * this is obviously not ideal. + */ + if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) { + ASSERT(0 == uv_cancel((uv_req_t*) req)); + break; + } + } +} + + +static void unblock_threadpool(void) { + uv_mutex_unlock(&signal_mutex); + uv_mutex_unlock(&wait_mutex); +} + + +static void cleanup_threadpool(void) { + ASSERT(done_cb_called == num_threads + 1); /* +1 == cancelled work req. */ + ASSERT(work_cb_called == num_threads); + + uv_cond_destroy(&signal_cond); + uv_mutex_destroy(&signal_mutex); + uv_mutex_destroy(&wait_mutex); +} + + +static void fs_cb(uv_fs_t* req) { + ASSERT(req->errorno == UV_ECANCELED); + uv_fs_req_cleanup(req); + fs_cb_called++; +} + + +static void getaddrinfo_cb(uv_getaddrinfo_t* req, + int status, + struct addrinfo* res) { + ASSERT(UV_ECANCELED == uv_last_error(req->loop).code); + ASSERT(UV_ECANCELED == status); + getaddrinfo_cb_called++; +} + + +static void work2_cb(uv_work_t* req) { + ASSERT(0 && "work2_cb called"); +} + + +static void done2_cb(uv_work_t* req, int status) { + ASSERT(uv_last_error(req->loop).code == UV_ECANCELED); + ASSERT(status == -1); + done2_cb_called++; +} + + +static void timer_cb(uv_timer_t* handle, int status) { + struct cancel_info* ci; + uv_req_t* req; + unsigned i; + + ci = container_of(handle, struct cancel_info, timer_handle); + + for (i = 0; i < ci->nreqs; i++) { + req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride); + ASSERT(0 == uv_cancel(req)); + } + + uv_close((uv_handle_t*) &ci->timer_handle, NULL); + unblock_threadpool(); + timer_cb_called++; +} + + +TEST_IMPL(threadpool_cancel_getaddrinfo) { + uv_getaddrinfo_t reqs[4]; + struct cancel_info ci; + struct addrinfo hints; + uv_loop_t* loop; + int r; + + INIT_CANCEL_INFO(&ci, reqs); + loop = uv_default_loop(); + saturate_threadpool(); + + r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL); + ASSERT(r == 0); + + r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL); + ASSERT(r == 0); + + r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL); + ASSERT(r == 0); + + r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints); + ASSERT(r == 0); + + ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); + ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); + ASSERT(0 == uv_run(loop)); + ASSERT(1 == timer_cb_called); + + cleanup_threadpool(); + + return 0; +} + + +TEST_IMPL(threadpool_cancel_work) { + struct cancel_info ci; + uv_work_t reqs[16]; + uv_loop_t* loop; + unsigned i; + + INIT_CANCEL_INFO(&ci, reqs); + loop = uv_default_loop(); + saturate_threadpool(); + + for (i = 0; i < ARRAY_SIZE(reqs); i++) + ASSERT(0 == uv_queue_work(loop, reqs + i, work2_cb, done2_cb)); + + ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); + ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); + ASSERT(0 == uv_run(loop)); + ASSERT(1 == timer_cb_called); + ASSERT(ARRAY_SIZE(reqs) == done2_cb_called); + + cleanup_threadpool(); + + return 0; +} + + +TEST_IMPL(threadpool_cancel_fs) { + struct cancel_info ci; + uv_fs_t reqs[25]; + uv_loop_t* loop; + unsigned n; + + INIT_CANCEL_INFO(&ci, reqs); + loop = uv_default_loop(); + saturate_threadpool(); + + /* Needs to match ARRAY_SIZE(fs_reqs). */ + n = 0; + ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb)); + ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fs_cb)); + ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fs_cb)); + ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb)); + ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb)); + ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb)); + ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb)); + ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb)); + ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fs_cb)); + ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb)); + ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fs_cb)); + ASSERT(n == ARRAY_SIZE(reqs)); + + ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); + ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); + ASSERT(0 == uv_run(loop)); + ASSERT(n == fs_cb_called); + ASSERT(1 == timer_cb_called); + + cleanup_threadpool(); + + return 0; +} diff --git a/deps/uv/test/test-threadpool.c b/deps/uv/test/test-threadpool.c index 12777b6e43c..bde9f472813 100644 --- a/deps/uv/test/test-threadpool.c +++ b/deps/uv/test/test-threadpool.c @@ -35,7 +35,8 @@ static void work_cb(uv_work_t* req) { } -static void after_work_cb(uv_work_t* req) { +static void after_work_cb(uv_work_t* req, int status) { + ASSERT(status == 0); ASSERT(req == &work_req); ASSERT(req->data == &data); after_work_cb_count++; @@ -56,3 +57,21 @@ TEST_IMPL(threadpool_queue_work_simple) { MAKE_VALGRIND_HAPPY(); return 0; } + + +TEST_IMPL(threadpool_queue_work_einval) { + int r; + + work_req.data = &data; + r = uv_queue_work(uv_default_loop(), &work_req, NULL, after_work_cb); + ASSERT(r == -1); + + uv_run(uv_default_loop()); + ASSERT(uv_last_error(uv_default_loop()).code == UV_EINVAL); + + ASSERT(work_cb_count == 0); + ASSERT(after_work_cb_count == 0); + + MAKE_VALGRIND_HAPPY(); + return 0; +} diff --git a/deps/uv/uv.gyp b/deps/uv/uv.gyp index 749decdea71..ac6f7f04ec9 100644 --- a/deps/uv/uv.gyp +++ b/deps/uv/uv.gyp @@ -250,6 +250,7 @@ 'test/test-cwd-and-chdir.c', 'test/test-delayed-accept.c', 'test/test-error.c', + 'test/test-embed.c', 'test/test-fail-always.c', 'test/test-fs.c', 'test/test-fs-event.c', @@ -298,7 +299,9 @@ 'test/test-tcp-write-to-half-open-connection.c', 'test/test-tcp-writealot.c', 'test/test-tcp-unexpected-read.c', + 'test/test-tcp-read-stop.c', 'test/test-threadpool.c', + 'test/test-threadpool-cancel.c', 'test/test-mutexes.c', 'test/test-thread.c', 'test/test-barrier.c', diff --git a/src/node_crypto.cc b/src/node_crypto.cc index 209d8800628..2777a1fb07e 100644 --- a/src/node_crypto.cc +++ b/src/node_crypto.cc @@ -3728,9 +3728,9 @@ void EIO_PBKDF2After(pbkdf2_req* req, Local argv[2]) { } -void EIO_PBKDF2After(uv_work_t* work_req) { +void EIO_PBKDF2After(uv_work_t* work_req, int status) { + assert(status == 0); pbkdf2_req* req = container_of(work_req, pbkdf2_req, work_req); - HandleScope scope; Local argv[2]; Persistent obj = req->obj; @@ -3902,16 +3902,15 @@ void RandomBytesCheck(RandomBytesRequest* req, Local argv[2]) { } -void RandomBytesAfter(uv_work_t* work_req) { +void RandomBytesAfter(uv_work_t* work_req, int status) { + assert(status == 0); RandomBytesRequest* req = container_of(work_req, RandomBytesRequest, work_req_); - HandleScope scope; Local argv[2]; RandomBytesCheck(req, argv); MakeCallback(req->obj_, "ondone", ARRAY_SIZE(argv), argv); - delete req; } diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 8b6fd0c21d2..13f94e9020f 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -213,7 +213,9 @@ class ZCtx : public ObjectWrap { } // v8 land! - static void After(uv_work_t* work_req) { + static void After(uv_work_t* work_req, int status) { + assert(status == 0); + HandleScope scope; ZCtx *ctx = container_of(work_req, ZCtx, work_req_);