Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: joyent/libuv
...
head fork: joyent/libuv
Checking mergeability… Don't worry, you can still create the pull request.
  • 6 commits
  • 10 files changed
  • 0 commit comments
  • 1 contributor
Commits on Feb 24, 2013
@bnoordhuis bnoordhuis unix, windows: add debug function uv_print_reqs() 072b7e3
@bnoordhuis bnoordhuis unix: implement a better threadpool
Split the threadpool in two: a small one for CPU-bound workloads and
a large one for I/O-bound workloads.

The CPU threadpool has approximately as many threads as there are CPUs
in the machine.

The I/O threadpool can have hundreds of threads, which is okay because
they should mostly be blocked in system calls.

I/O threads run on a reduced stack (32 kB) to avoid using up too much
virtual address space on 32 bits architectures.
622de05
@bnoordhuis bnoordhuis test: fix 'buffer too small' fmt() warning 1f560b6
@bnoordhuis bnoordhuis bench: fix up fs_stat 303fe67
@bnoordhuis bnoordhuis unix: make threadpool spiffier 9703ab4
@bnoordhuis bnoordhuis test: remove 'is root?' check
I debug tests regularly as root (because dtrace and dtruss require the
additional privileges). The 'is root?' check gets in the way more often
than it prevents me from doing something silly. Remove it.
6fba593
View
1  include/uv-private/uv-unix.h
@@ -83,6 +83,7 @@ struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
+ void* thread_ctx;
ngx_queue_t wq;
};
View
77 src/unix/fs.c
@@ -82,10 +82,15 @@
} \
while (0)
-#define POST \
+#define POST(hint) \
do { \
if ((cb) != NULL) { \
- uv__work_submit((loop), &(req)->work_req, uv__fs_work, uv__fs_done); \
+ uv__work_submit((loop), \
+ &(req)->work_req, \
+ uv__fs_work, \
+ uv__fs_done, \
+ UV__THREADPOOL_IO, \
+ (hint)); \
return 0; \
} \
else { \
@@ -579,6 +584,24 @@ static void uv__fs_done(struct uv__work* w, int status) {
}
+/* Jenkins hash. */
+static long hash(const char* s) {
+ long h;
+
+ for (h = 0; *s != '\0'; s++) {
+ h += *s;
+ h += h << 10;
+ h ^= h >> 6;
+ }
+
+ h += h << 3;
+ h ^= h >> 1;
+ h += h << 15;
+
+ return h;
+}
+
+
int uv_fs_chmod(uv_loop_t* loop,
uv_fs_t* req,
const char* path,
@@ -587,7 +610,7 @@ int uv_fs_chmod(uv_loop_t* loop,
INIT(CHMOD);
PATH;
req->mode = mode;
- POST;
+ POST(hash(path));
}
@@ -601,14 +624,14 @@ int uv_fs_chown(uv_loop_t* loop,
PATH;
req->uid = uid;
req->gid = gid;
- POST;
+ POST(hash(path));
}
int uv_fs_close(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb) {
INIT(CLOSE);
req->file = file;
- POST;
+ POST(file);
}
@@ -620,7 +643,7 @@ int uv_fs_fchmod(uv_loop_t* loop,
INIT(FCHMOD);
req->file = file;
req->mode = mode;
- POST;
+ POST(file);
}
@@ -634,28 +657,28 @@ int uv_fs_fchown(uv_loop_t* loop,
req->file = file;
req->uid = uid;
req->gid = gid;
- POST;
+ POST(file);
}
int uv_fs_fdatasync(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb) {
INIT(FDATASYNC);
req->file = file;
- POST;
+ POST(file);
}
int uv_fs_fstat(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb) {
INIT(FSTAT);
req->file = file;
- POST;
+ POST(file);
}
int uv_fs_fsync(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb) {
INIT(FSYNC);
req->file = file;
- POST;
+ POST(file);
}
@@ -667,7 +690,7 @@ int uv_fs_ftruncate(uv_loop_t* loop,
INIT(FTRUNCATE);
req->file = file;
req->off = off;
- POST;
+ POST(file);
}
@@ -681,14 +704,14 @@ int uv_fs_futime(uv_loop_t* loop,
req->file = file;
req->atime = atime;
req->mtime = mtime;
- POST;
+ POST(file);
}
int uv_fs_lstat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
INIT(LSTAT);
PATH;
- POST;
+ POST(hash(path));
}
@@ -699,7 +722,7 @@ int uv_fs_link(uv_loop_t* loop,
uv_fs_cb cb) {
INIT(LINK);
PATH2;
- POST;
+ POST(hash(path)); /* TODO Take both path and new_path in account. */
}
@@ -711,7 +734,7 @@ int uv_fs_mkdir(uv_loop_t* loop,
INIT(MKDIR);
PATH;
req->mode = mode;
- POST;
+ POST(hash(path));
}
@@ -725,7 +748,7 @@ int uv_fs_open(uv_loop_t* loop,
PATH;
req->flags = flags;
req->mode = mode;
- POST;
+ POST(hash(path));
}
@@ -740,7 +763,7 @@ int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
req->buf = buf;
req->len = len;
req->off = off;
- POST;
+ POST(file);
}
@@ -752,7 +775,7 @@ int uv_fs_readdir(uv_loop_t* loop,
INIT(READDIR);
PATH;
req->flags = flags;
- POST;
+ POST(hash(path));
}
@@ -762,7 +785,7 @@ int uv_fs_readlink(uv_loop_t* loop,
uv_fs_cb cb) {
INIT(READLINK);
PATH;
- POST;
+ POST(hash(path));
}
@@ -773,14 +796,14 @@ int uv_fs_rename(uv_loop_t* loop,
uv_fs_cb cb) {
INIT(RENAME);
PATH2;
- POST;
+ POST(hash(path)); /* TODO Take both path and new_path in account. */
}
int uv_fs_rmdir(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
INIT(RMDIR);
PATH;
- POST;
+ POST(hash(path));
}
@@ -796,14 +819,14 @@ int uv_fs_sendfile(uv_loop_t* loop,
req->file = out_fd;
req->off = off;
req->len = len;
- POST;
+ POST(in_fd); /* TODO Take both in_fd and out_fd in account. */
}
int uv_fs_stat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
INIT(STAT);
PATH;
- POST;
+ POST(hash(path));
}
@@ -816,14 +839,14 @@ int uv_fs_symlink(uv_loop_t* loop,
INIT(SYMLINK);
PATH2;
req->flags = flags;
- POST;
+ POST(hash(path)); /* TODO Take both path and new_path in account. */
}
int uv_fs_unlink(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb) {
INIT(UNLINK);
PATH;
- POST;
+ POST(hash(path));
}
@@ -837,7 +860,7 @@ int uv_fs_utime(uv_loop_t* loop,
PATH;
req->atime = atime;
req->mtime = mtime;
- POST;
+ POST(hash(path));
}
@@ -853,7 +876,7 @@ int uv_fs_write(uv_loop_t* loop,
req->buf = buf;
req->len = len;
req->off = off;
- POST;
+ POST(file);
}
View
4 src/unix/getaddrinfo.c
@@ -147,7 +147,9 @@ int uv_getaddrinfo(uv_loop_t* loop,
uv__work_submit(loop,
&req->work_req,
uv__getaddrinfo_work,
- uv__getaddrinfo_done);
+ uv__getaddrinfo_done,
+ UV__THREADPOOL_IO,
+ -1);
return 0;
}
View
22 src/unix/internal.h
@@ -45,6 +45,9 @@
# include <CoreServices/CoreServices.h>
#endif
+#define ACCESS_ONCE(type, var) \
+ (*(volatile type*) &(var))
+
#define UNREACHABLE() \
do { \
assert(0 && "unreachable code"); \
@@ -90,6 +93,21 @@
# define UV__POLLHUP 8
#endif
+/* There are roughly as many CPU threads as there are CPUs in the machine.
+ *
+ * I/O threads have no such limitation, there can be hundreds of them.
+ * They're intended for workloads where most time is spent blocked inside
+ * system calls.
+ *
+ * Note that I/O threads run on a reduced stack to avoid consuming too much
+ * address space on 32 bits machines.
+ */
+enum {
+ UV__THREADPOOL_CPU = 0,
+ UV__THREADPOOL_IO = 1,
+ UV__THREADPOOL_MAX
+};
+
/* handle flags */
enum {
UV_CLOSING = 0x01, /* uv_close() called but not finished. */
@@ -162,7 +180,9 @@ void uv__signal_loop_cleanup(uv_loop_t* loop);
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
void (*work)(struct uv__work *w),
- void (*done)(struct uv__work *w, int status));
+ void (*done)(struct uv__work *w, int status),
+ unsigned int type,
+ long hint);
void uv__work_done(uv_async_t* handle, int status);
/* platform specific */
View
247 src/unix/threadpool.c
@@ -21,13 +21,28 @@
#include "internal.h"
#include <stdlib.h>
-
+#include <limits.h> /* PTHREAD_STACK_MIN */
+#include <unistd.h> /* sysconf() */
+
+struct thread_ctx {
+ uv_cond_t cond;
+ uv_mutex_t mutex;
+ ngx_queue_t work_queue;
+ pthread_t thread;
+ int quit;
+};
+
+struct threadpool_ctx {
+ uv_cond_t cond;
+ uv_mutex_t mutex;
+ unsigned int stack_size;
+ unsigned int cur_threads;
+ unsigned int max_threads;
+ struct thread_ctx thread_contexts[1]; /* Variadic length. */
+};
+
+static struct threadpool_ctx* threadpools[UV__THREADPOOL_MAX];
static uv_once_t once = UV_ONCE_INIT;
-static uv_cond_t cond;
-static uv_mutex_t mutex;
-static uv_thread_t threads[4];
-static ngx_queue_t exit_message;
-static ngx_queue_t wq;
static volatile int initialized;
@@ -39,34 +54,31 @@ static void uv__cancelled(struct uv__work* w) {
/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
-static void worker(void* arg) {
+static void* worker(void* arg) {
+ struct thread_ctx* tc;
struct uv__work* w;
ngx_queue_t* q;
- (void) arg;
+ tc = arg;
for (;;) {
- uv_mutex_lock(&mutex);
-
- while (ngx_queue_empty(&wq))
- uv_cond_wait(&cond, &mutex);
+ uv_mutex_lock(&tc->mutex);
- q = ngx_queue_head(&wq);
+ while (tc->quit == 0 && ngx_queue_empty(&tc->work_queue))
+ uv_cond_wait(&tc->cond, &tc->mutex);
- if (q == &exit_message)
- uv_cond_signal(&cond);
- else {
- ngx_queue_remove(q);
- ngx_queue_init(q); /* Signal uv_cancel() that the work req is
- executing. */
+ if (tc->quit != 0 && ngx_queue_empty(&tc->work_queue)) {
+ uv_mutex_unlock(&tc->mutex);
+ return NULL;
}
- uv_mutex_unlock(&mutex);
-
- if (q == &exit_message)
- break;
+ q = ngx_queue_head(&tc->work_queue);
+ ngx_queue_remove(q);
+ ngx_queue_init(q); /* Signal uv_cancel() that the work req is executing. */
+ uv_mutex_unlock(&tc->mutex);
w = ngx_queue_data(q, struct uv__work, wq);
+ assert(w->thread_ctx == tc);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
@@ -76,36 +88,137 @@ static void worker(void* arg) {
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
}
+
+ UNREACHABLE();
+ return NULL;
}
-static void post(ngx_queue_t* q) {
- uv_mutex_lock(&mutex);
- ngx_queue_insert_tail(&wq, q);
- uv_cond_signal(&cond);
- uv_mutex_unlock(&mutex);
+static struct threadpool_ctx* threadpool_new(unsigned int max_threads,
+ unsigned int stack_size) {
+ struct threadpool_ctx* ctx;
+
+ ctx = malloc(sizeof(*ctx) +
+ sizeof(ctx->thread_contexts[0]) * (max_threads - 1));
+
+ if (ctx == NULL)
+ abort();
+
+ if (uv_cond_init(&ctx->cond))
+ abort();
+
+ if (uv_mutex_init(&ctx->mutex))
+ abort();
+
+ if (stack_size < PTHREAD_STACK_MIN)
+ stack_size = PTHREAD_STACK_MIN;
+
+ ctx->stack_size = stack_size;
+ ctx->cur_threads = 0;
+ ctx->max_threads = max_threads;
+
+ return ctx;
}
-static void init_once(void) {
+static void threadpool_destroy(struct threadpool_ctx* ctx) {
+ struct thread_ctx* tc;
unsigned int i;
- if (uv_cond_init(&cond))
+ uv_mutex_lock(&ctx->mutex);
+
+ for (i = 0; i < ctx->cur_threads; i++) {
+ tc = ctx->thread_contexts + i;
+
+ uv_mutex_lock(&tc->mutex);
+ tc->quit = 1;
+ uv_cond_signal(&tc->cond);
+ uv_mutex_unlock(&tc->mutex);
+
+ if (pthread_join(tc->thread, NULL))
+ abort();
+
+ uv_mutex_destroy(&tc->mutex);
+ uv_cond_destroy(&tc->cond);
+ }
+
+ uv_mutex_unlock(&ctx->mutex);
+ uv_mutex_destroy(&ctx->mutex);
+ uv_cond_destroy(&ctx->cond);
+ free(ctx);
+}
+
+
+static void threadpool_grow(struct threadpool_ctx* ctx) {
+ struct thread_ctx* tc;
+ pthread_attr_t attr;
+ size_t stack_size;
+
+ /* Cheap but safe check: max_threads is immutable, cur_threads is a naturally
+ * aligned integer and doesn't need locking to read. Worst case, we first see
+ * that cur_threads < max_threads, acquire the lock, discover that another
+ * thread has preempted us and that cur_threads == max_threads now.
+ */
+ if (ACCESS_ONCE(unsigned int, ctx->cur_threads) == ctx->max_threads)
+ return;
+
+ stack_size = ctx->stack_size;
+
+ uv_mutex_lock(&ctx->mutex);
+
+ if (ctx->cur_threads == ctx->max_threads)
+ goto out;
+
+ tc = ctx->thread_contexts + ctx->cur_threads;
+ tc->quit = 0;
+ ngx_queue_init(&tc->work_queue);
+
+ if (uv_cond_init(&tc->cond))
abort();
- if (uv_mutex_init(&mutex))
+ if (uv_mutex_init(&tc->mutex))
abort();
- ngx_queue_init(&wq);
+ if (pthread_attr_init(&attr))
+ abort();
- for (i = 0; i < ARRAY_SIZE(threads); i++)
- if (uv_thread_create(threads + i, worker, NULL))
+ if (stack_size > 0)
+ if (pthread_attr_setstacksize(&attr, stack_size))
abort();
+ if (pthread_create(&tc->thread, &attr, worker, tc))
+ abort();
+
+ if (pthread_attr_destroy(&attr))
+ abort();
+
+ ctx->cur_threads++;
+
+out:
+
+ uv_mutex_unlock(&ctx->mutex);
+}
+
+
+static void init_once(void) {
+ long numcpus;
+
+ numcpus = sysconf(_SC_NPROCESSORS_ONLN);
+ if (numcpus <= 0)
+ numcpus = 1;
+
+ threadpools[UV__THREADPOOL_CPU] = threadpool_new(numcpus, 0);
+ threadpools[UV__THREADPOOL_IO] = threadpool_new(numcpus, 32768);
initialized = 1;
}
+/* This destructor is here mostly to please Valgrind. It is up for debate
+ * if draining the thread pools after main() has returned is a good thing:
+ * any global or shared state the work and done callbacks rely on, is most
+ * likely gone by now. If it turns out to be a problem, we'll hide it behind
+ * a Valgrind-only #define.
+ */
#if defined(__GNUC__)
__attribute__((destructor))
static void cleanup(void) {
@@ -114,14 +227,11 @@ static void cleanup(void) {
if (initialized == 0)
return;
- post(&exit_message);
-
- for (i = 0; i < ARRAY_SIZE(threads); i++)
- if (uv_thread_join(threads + i))
- abort();
+ for (i = 0; i < ARRAY_SIZE(threadpools); i++) {
+ threadpool_destroy(threadpools[i]);
+ threadpools[i] = NULL;
+ }
- uv_mutex_destroy(&mutex);
- uv_cond_destroy(&cond);
initialized = 0;
}
#endif
@@ -130,19 +240,56 @@ static void cleanup(void) {
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
- void (*done)(struct uv__work* w, int status)) {
+ void (*done)(struct uv__work* w, int status),
+ unsigned int type,
+ long hint) {
+ struct threadpool_ctx* ctx;
+ struct thread_ctx* tc;
+
+ assert(type < ARRAY_SIZE(threadpools));
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
- post(&w->wq);
+
+ ctx = threadpools[type];
+ threadpool_grow(ctx); /* TODO Only grow when all threads are busy. */
+
+ if (hint == -1) {
+ /* TODO Find the least loaded worker thread. */
+ hint = 0;
+ }
+ else {
+ hint %= ACCESS_ONCE(unsigned int, ctx->cur_threads);
+
+ /* Sign of remainder with signed modulus depends on implementation.
+ * Trust that the compiler is smart enough to optimize away the
+ * comparison when it's positive.
+ */
+ if (hint < 0)
+ hint = -hint;
+ }
+
+ assert(hint >= 0);
+ assert(hint < (int) ctx->cur_threads);
+
+ tc = w->thread_ctx = ctx->thread_contexts + hint;
+
+ uv_mutex_lock(&tc->mutex);
+ ngx_queue_insert_tail(&tc->work_queue, &w->wq);
+ uv_cond_signal(&tc->cond);
+ uv_mutex_unlock(&tc->mutex);
}
-static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
+static int uv__work_cancel(uv_loop_t* loop,
+ uv_req_t* req,
+ struct uv__work* w) {
+ struct thread_ctx* tc;
int cancelled;
- uv_mutex_lock(&mutex);
+ tc = w->thread_ctx;
+ uv_mutex_lock(&tc->mutex);
uv_mutex_lock(&w->loop->wq_mutex);
cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL;
@@ -150,7 +297,7 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
ngx_queue_remove(&w->wq);
uv_mutex_unlock(&w->loop->wq_mutex);
- uv_mutex_unlock(&mutex);
+ uv_mutex_unlock(&tc->mutex);
if (!cancelled)
return -1;
@@ -227,7 +374,13 @@ int uv_queue_work(uv_loop_t* loop,
req->loop = loop;
req->work_cb = work_cb;
req->after_work_cb = after_work_cb;
- uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
+ uv__work_submit(loop,
+ &req->work_req,
+ uv__queue_work,
+ uv__queue_done,
+ UV__THREADPOOL_CPU,
+ (long) req >> 4);
+
return 0;
}
View
35 src/uv-common.c
@@ -326,7 +326,30 @@ void uv_walk(uv_loop_t* loop, uv_walk_cb walk_cb, void* arg) {
#ifndef NDEBUG
-static void uv__print_handles(uv_loop_t* loop, int only_active) {
+void uv_print_reqs(uv_loop_t* loop) {
+ const char* type;
+ ngx_queue_t* q;
+ uv_req_t* req;
+
+ if (loop == NULL)
+ loop = uv_default_loop();
+
+ ngx_queue_foreach(q, &loop->active_reqs) {
+ req = ngx_queue_data(q, uv_req_t, active_queue);
+
+ switch (req->type) {
+#define X(uc, lc) case UV_##uc: type = #lc; break;
+ UV_REQ_TYPE_MAP(X)
+#undef X
+ default: type = "<unknown>";
+ }
+
+ fprintf(stderr, "%p %s\n", (void*) req, type);
+ }
+}
+
+
+void uv_print_handles(uv_loop_t* loop, int only_active) {
const char* type;
ngx_queue_t* q;
uv_handle_t* h;
@@ -356,16 +379,6 @@ static void uv__print_handles(uv_loop_t* loop, int only_active) {
(void*)h);
}
}
-
-
-void uv_print_all_handles(uv_loop_t* loop) {
- uv__print_handles(loop, 0);
-}
-
-
-void uv_print_active_handles(uv_loop_t* loop) {
- uv__print_handles(loop, 1);
-}
#endif
View
66 test/benchmark-fs-stat.c
@@ -25,68 +25,21 @@
#include <stdio.h>
#include <stdlib.h>
-#define NUM_SYNC_REQS (10 * 1e5)
#define NUM_ASYNC_REQS (1 * (int) 1e5)
#define MAX_CONCURRENT_REQS 32
-#define sync_stat(req, path) \
- do { \
- uv_fs_stat(uv_default_loop(), (req), (path), NULL); \
- uv_fs_req_cleanup((req)); \
- } \
- while (0)
-
struct async_req {
- const char* path;
uv_fs_t fs_req;
- int* count;
+ uv_file file;
+ int count;
};
-static void warmup(const char* path) {
- uv_fs_t reqs[MAX_CONCURRENT_REQS];
- unsigned int i;
-
- /* warm up the thread pool */
- for (i = 0; i < ARRAY_SIZE(reqs); i++)
- uv_fs_stat(uv_default_loop(), reqs + i, path, uv_fs_req_cleanup);
-
- uv_run(uv_default_loop(), UV_RUN_DEFAULT);
-
- /* warm up the OS dirent cache */
- for (i = 0; i < 16; i++)
- sync_stat(reqs + 0, path);
-}
-
-
-static void sync_bench(const char* path) {
- uint64_t before;
- uint64_t after;
- uv_fs_t req;
- int i;
-
- /* do the sync benchmark */
- before = uv_hrtime();
-
- for (i = 0; i < NUM_SYNC_REQS; i++)
- sync_stat(&req, path);
-
- after = uv_hrtime();
-
- printf("%s stats (sync): %.2fs (%s/s)\n",
- fmt(1.0 * NUM_SYNC_REQS),
- (after - before) / 1e9,
- fmt((1.0 * NUM_SYNC_REQS) / ((after - before) / 1e9)));
- fflush(stdout);
-}
-
-
static void stat_cb(uv_fs_t* fs_req) {
struct async_req* req = container_of(fs_req, struct async_req, fs_req);
uv_fs_req_cleanup(&req->fs_req);
- if (*req->count == 0) return;
- uv_fs_stat(uv_default_loop(), &req->fs_req, req->path, stat_cb);
- (*req->count)--;
+ if (--req->count <= 0) return;
+ uv_fs_fstat(uv_default_loop(), &req->fs_req, req->file, stat_cb);
}
@@ -95,16 +48,13 @@ static void async_bench(const char* path) {
struct async_req* req;
uint64_t before;
uint64_t after;
- int count;
int i;
for (i = 1; i <= MAX_CONCURRENT_REQS; i++) {
- count = NUM_ASYNC_REQS;
-
for (req = reqs; req < reqs + i; req++) {
- req->path = path;
- req->count = &count;
- uv_fs_stat(uv_default_loop(), &req->fs_req, req->path, stat_cb);
+ req->file = open("/dev/null", O_RDONLY);
+ req->count = NUM_ASYNC_REQS / i;
+ uv_fs_fstat(uv_default_loop(), &req->fs_req, req->file, stat_cb);
}
before = uv_hrtime();
@@ -128,8 +78,6 @@ static void async_bench(const char* path) {
*/
BENCHMARK_IMPL(fs_stat) {
const char path[] = ".";
- warmup(path);
- sync_bench(path);
async_bench(path);
MAKE_VALGRIND_HAPPY();
return 0;
View
10 test/runner-unix.c
@@ -41,15 +41,9 @@
/* Do platform-specific initialization. */
void platform_init(int argc, char **argv) {
- const char* var = getenv("UV_RUN_AS_ROOT");
- const char* tap = getenv("UV_TAP_OUTPUT");
-
- /* Running the tests as root is not smart - don't do it. */
- if (getuid() == 0 && (var == NULL || atoi(var) <= 0)) {
- fprintf(stderr, "Running the tests as root is not safe.\n");
- exit(1);
- }
+ const char* tap;
+ tap = getenv("UV_TAP_OUTPUT");
tap_output = (tap != NULL && atoi(tap) > 0);
/* Disable stdio output buffering. */
View
2  test/runner.c
@@ -41,7 +41,7 @@ static void log_progress(int total, int passed, int failed, const char* name) {
const char* fmt(double d) {
- static char buf[1024];
+ static char buf[4096];
static char* p;
uint64_t v;
View
99 test/test-threadpool-cancel.c
@@ -49,6 +49,102 @@ static unsigned timer_cb_called;
static unsigned getaddrinfo_cb_called;
+#if defined(_WIN32)
+
+static void saturate_io_threadpool(void) { }
+static void unblock_io_threadpool(void) { }
+
+#else /* !defined(_WIN32) */
+
+/* uv-unix effectively has two threadpools: one for CPU-bound tasks and one
+ * for I/O tasks. We need some special handling to saturate the second one.
+ */
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <string.h>
+#include <fcntl.h>
+#include <errno.h>
+
+struct io_task {
+ ngx_queue_t queue;
+ uv_fs_t fs_req;
+ int pipefd[2];
+ char buf[1];
+};
+
+static ngx_queue_t io_tasks;
+
+
+static void io_task_cleanup(struct io_task* task) {
+ uv_fs_req_cleanup(&task->fs_req);
+ ngx_queue_remove(&task->queue);
+ ngx_queue_init(&task->queue);
+ free(task);
+}
+
+
+static void io_read_cb(uv_fs_t* req) {
+ struct io_task* task;
+
+ task = container_of(req, struct io_task, fs_req);
+ io_task_cleanup(task);
+}
+
+
+static void saturate_io_threadpool(void) {
+ struct io_task* tasks[32];
+ struct io_task* task;
+ uv_loop_t* loop;
+ int cancelled;
+ unsigned i;
+
+ ngx_queue_init(&io_tasks);
+ loop = uv_default_loop();
+
+ for (;;) {
+ for (i = 0; i < ARRAY_SIZE(tasks); i++) {
+ task = calloc(1, sizeof(*task));
+ ASSERT(task != NULL);
+ ASSERT(0 == pipe(task->pipefd));
+ ASSERT(0 == uv_fs_read(loop,
+ &task->fs_req,
+ task->pipefd[0],
+ task->buf,
+ sizeof(task->buf),
+ -1,
+ io_read_cb));
+ ngx_queue_insert_tail(&io_tasks, &task->queue);
+ tasks[i] = task;
+ }
+
+ uv_sleep(250);
+
+ cancelled = 0;
+ for (i = 0; i < ARRAY_SIZE(tasks); i++)
+ if (uv_cancel((uv_req_t*) &tasks[i]->fs_req) == 0)
+ cancelled++;
+
+ if (cancelled != 0)
+ return;
+ }
+}
+
+
+static void unblock_io_threadpool(void) {
+ struct io_task* task;
+ ngx_queue_t* q;
+
+ ngx_queue_foreach(q, &io_tasks) {
+ task = ngx_queue_data(q, struct io_task, queue);
+ close(task->pipefd[0]);
+ close(task->pipefd[1]);
+ }
+}
+
+#endif /* defined(_WIN32) */
+
+
static void work_cb(uv_work_t* req) {
uv_mutex_lock(&signal_mutex);
uv_cond_signal(&signal_cond);
@@ -91,12 +187,15 @@ static void saturate_threadpool(void) {
break;
}
}
+
+ saturate_io_threadpool();
}
static void unblock_threadpool(void) {
uv_mutex_unlock(&signal_mutex);
uv_mutex_unlock(&wait_mutex);
+ unblock_io_threadpool();
}

No commit comments for this range

Something went wrong with that request. Please try again.