diff --git a/include/uv.h b/include/uv.h index 483a174282..bcb9985066 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1372,6 +1372,13 @@ UV_EXTERN int uv_work_queue(uv_loop_t* loop, uv_work_cb work_cb, uv_after_work_cb after_work_cb); +/* Cancel a pending work request. Fails if the work request is executing or + * has finished executing. + * + * Returns 0 on success, -1 on error. The loop error code is not touched. + */ +UV_EXTERN int uv_work_cancel(uv_work_t* req); + struct uv_cpu_info_s { char* model; diff --git a/src/unix/threadpool.c b/src/unix/threadpool.c index 68b2a78980..243768ddfd 100644 --- a/src/unix/threadpool.c +++ b/src/unix/threadpool.c @@ -30,6 +30,9 @@ static ngx_queue_t wq = { &wq, &wq }; static volatile int initialized; +/* To avoid deadlock with uv_work_cancel() it's crucial that the worker + * never holds the global mutex and the loop-local mutex at the same time. + */ static void worker(void* arg) { struct uv__work* w; ngx_queue_t* q; @@ -46,8 +49,11 @@ static void worker(void* arg) { if (q == &exit_message) uv_cond_signal(&cond); - else + else { ngx_queue_remove(q); + ngx_queue_init(q); /* Signal uv_work_cancel() that the work req is + executing. */ + } uv_mutex_unlock(&mutex); @@ -58,6 +64,8 @@ static void worker(void* arg) { w->work(w); uv_mutex_lock(&w->loop->wq_mutex); + w->work = NULL; /* Signal uv_work_cancel() that the work req is done + executing. */ ngx_queue_insert_tail(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); @@ -176,6 +184,30 @@ int uv_work_queue(uv_loop_t* loop, } +int uv_work_cancel(uv_work_t* req) { + struct uv__work* w; + int cancelled; + + w = &req->work_req; + uv_mutex_lock(&mutex); + uv_mutex_lock(&w->loop->wq_mutex); + + cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL; + if (cancelled) + ngx_queue_remove(&w->wq); + + uv_mutex_unlock(&w->loop->wq_mutex); + uv_mutex_unlock(&mutex); + + if (cancelled) { + uv__req_unregister(req->loop, req); + return 0; + } + + return -1; +} + + int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, diff --git a/src/win/threadpool.c b/src/win/threadpool.c index 8278aef46b..37928b9447 100644 --- a/src/win/threadpool.c +++ b/src/win/threadpool.c @@ -70,6 +70,11 @@ int uv_work_queue(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, } +int uv_work_cancel(uv_work_t* req) { + return -1; +} + + void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) { uv__req_unregister(loop, req); if(req->after_work_cb) diff --git a/test/test-list.h b/test/test-list.h index f5cc95c6b3..3ae4ecc058 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -186,6 +186,7 @@ TEST_DECLARE (fs_rename_to_existing_file) TEST_DECLARE (threadpool_queue_work_simple) TEST_DECLARE (threadpool_queue_work_einval) TEST_DECLARE (threadpool_multiple_event_loops) +TEST_DECLARE (threadpool_cancel_work) TEST_DECLARE (thread_mutex) TEST_DECLARE (thread_rwlock) TEST_DECLARE (thread_create) @@ -451,6 +452,7 @@ TASK_LIST_START TEST_ENTRY (threadpool_queue_work_simple) TEST_ENTRY (threadpool_queue_work_einval) TEST_ENTRY (threadpool_multiple_event_loops) + TEST_ENTRY (threadpool_cancel_work) TEST_ENTRY (thread_mutex) TEST_ENTRY (thread_rwlock) TEST_ENTRY (thread_create) diff --git a/test/test-threadpool-cancel.c b/test/test-threadpool-cancel.c new file mode 100644 index 0000000000..d6f7f77618 --- /dev/null +++ b/test/test-threadpool-cancel.c @@ -0,0 +1,113 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +static uv_cond_t signal_cond; +static uv_mutex_t signal_mutex; +static uv_mutex_t wait_mutex; +static unsigned done_cb_called; +static unsigned timer_cb_called; +static uv_work_t reqs[16]; + + +static void work_cb(uv_work_t* req) { + uv_mutex_lock(&signal_mutex); + uv_cond_signal(&signal_cond); + uv_mutex_unlock(&signal_mutex); + + uv_mutex_lock(&wait_mutex); + uv_mutex_unlock(&wait_mutex); +} + + +static void done_cb(uv_work_t* req) { + done_cb_called++; + free(req); +} + + +static void fail_cb(uv_work_t* req) { + ASSERT(0 && "fail_cb called"); +} + + +static void timer_cb(uv_timer_t* handle, int status) { + unsigned i; + + for (i = 0; i < ARRAY_SIZE(reqs); i++) + ASSERT(0 == uv_work_cancel(reqs + i)); + + uv_close((uv_handle_t*) handle, NULL); + uv_mutex_unlock(&signal_mutex); + uv_mutex_unlock(&wait_mutex); + timer_cb_called++; +} + + +/* Submit as many work requests as it takes to saturate the thread pool, + * then submit more and try to cancel them. + */ +TEST_IMPL(threadpool_cancel_work) { + uv_timer_t timer_handle; + unsigned num_threads; + unsigned i; + uv_work_t* req; + + ASSERT(0 == uv_cond_init(&signal_cond)); + ASSERT(0 == uv_mutex_init(&signal_mutex)); + ASSERT(0 == uv_mutex_init(&wait_mutex)); + + uv_mutex_lock(&signal_mutex); + uv_mutex_lock(&wait_mutex); + + for (num_threads = 0; /* empty */; num_threads++) { + req = malloc(sizeof(*req)); + ASSERT(req != NULL); + ASSERT(0 == uv_work_queue(uv_default_loop(), req, work_cb, done_cb)); + + /* Expect to get signalled within 350 ms, otherwise assume that + * the thread pool is saturated. As with any timing dependent test, + * this is obviously not ideal. + */ + if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) { + ASSERT(0 == uv_work_cancel(req)); + free(req); + break; + } + } + + for (i = 0; i < ARRAY_SIZE(reqs); i++) + ASSERT(0 == uv_work_queue(uv_default_loop(), reqs + i, fail_cb, NULL)); + + ASSERT(0 == uv_timer_init(uv_default_loop(), &timer_handle)); + ASSERT(0 == uv_timer_start(&timer_handle, timer_cb, 10, 0)); + ASSERT(0 == uv_run(uv_default_loop())); + + ASSERT(done_cb_called == num_threads); + + uv_cond_destroy(&signal_cond); + uv_mutex_destroy(&signal_mutex); + uv_mutex_destroy(&wait_mutex); + + return 0; +} diff --git a/uv.gyp b/uv.gyp index 749decdea7..c1cff46caf 100644 --- a/uv.gyp +++ b/uv.gyp @@ -299,6 +299,7 @@ 'test/test-tcp-writealot.c', 'test/test-tcp-unexpected-read.c', 'test/test-threadpool.c', + 'test/test-threadpool-cancel.c', 'test/test-mutexes.c', 'test/test-thread.c', 'test/test-barrier.c',