Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
unix: rework uv_cancel() api
Browse files Browse the repository at this point in the history
Bert Belder informs me the current approach where a request is immediately
cancelled, is impossible to implement on Windows.

Rework the API to always invoke the "done" callback with an UV_ECANCELED error
code.
  • Loading branch information
bnoordhuis committed Dec 13, 2012
1 parent 731adac commit 92fb84b
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 53 deletions.
13 changes: 12 additions & 1 deletion include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal);
typedef void (*uv_walk_cb)(uv_handle_t* handle, void* arg);
typedef void (*uv_fs_cb)(uv_fs_t* req);
typedef void (*uv_work_cb)(uv_work_t* req);
typedef void (*uv_after_work_cb)(uv_work_t* req);
typedef void (*uv_after_work_cb)(uv_work_t* req, int status);
typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* req,
int status,
struct addrinfo* res);
Expand Down Expand Up @@ -1394,6 +1394,17 @@ UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req,
* Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is
* currently supported.
*
* Cancelled requests have their callbacks invoked some time in the future.
* It's _not_ safe to free the memory associated with the request until your
* callback is called.
*
* Here is how cancellation is reported to your callback:
*
* - A uv_fs_t request has its req->errorno field set to UV_ECANCELED.
*
* - A uv_work_t or uv_getaddrinfo_t request has its callback invoked with
* status == -1 and uv_last_error(loop).code == UV_ECANCELED.
*
* This function is currently only implemented on UNIX platforms. On Windows,
* it always returns -1.
*/
Expand Down
11 changes: 6 additions & 5 deletions src/unix/fs.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,16 +522,17 @@ static void uv__fs_done(struct uv__work* w, int status) {
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);

if (status != 0) {
uv_fs_req_cleanup(req);
return;
}

if (req->errorno != 0) {
req->errorno = uv_translate_sys_error(req->errorno);
uv__set_artificial_error(req->loop, req->errorno);
}

if (status == -UV_ECANCELED) {
assert(req->errorno == 0);
req->errorno = UV_ECANCELED;
uv__set_artificial_error(req->loop, UV_ECANCELED);
}

if (req->cb != NULL)
req->cb(req);
}
Expand Down
9 changes: 6 additions & 3 deletions src/unix/getaddrinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ static void uv__getaddrinfo_done(struct uv__work* w, int status) {
req->service = NULL;
req->hostname = NULL;

if (status != 0)
return;

if (req->retcode == 0) {
/* OK */
#if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */
Expand All @@ -87,6 +84,12 @@ static void uv__getaddrinfo_done(struct uv__work* w, int status) {
req->loop->last_err.sys_errno_ = req->retcode;
}

if (status == -UV_ECANCELED) {
assert(req->retcode == 0);
req->retcode = UV_ECANCELED;
uv__set_artificial_error(req->loop, UV_ECANCELED);
}

req->cb(req, req->retcode, res);
}

Expand Down
27 changes: 20 additions & 7 deletions src/unix/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

#include "internal.h"
#include <stdlib.h>

static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
Expand All @@ -30,6 +31,11 @@ static ngx_queue_t wq;
static volatile int initialized;


static void uv__cancelled(struct uv__work* w) {
abort();
}


/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
Expand Down Expand Up @@ -149,8 +155,10 @@ int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
if (!cancelled)
return -1;

ngx_queue_init(&w->wq);
w->done(w, -UV_ECANCELED);
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
ngx_queue_insert_tail(&loop->wq, &w->wq);
uv_mutex_unlock(&loop->wq_mutex);

return 0;
}
Expand All @@ -161,6 +169,7 @@ void uv__work_done(uv_async_t* handle, int status) {
uv_loop_t* loop;
ngx_queue_t* q;
ngx_queue_t wq;
int err;

loop = container_of(handle, uv_loop_t, wq_async);
ngx_queue_init(&wq);
Expand All @@ -177,7 +186,8 @@ void uv__work_done(uv_async_t* handle, int status) {
ngx_queue_remove(q);

w = container_of(q, struct uv__work, wq);
w->done(w, 0);
err = (w->work == uv__cancelled) ? -UV_ECANCELED : 0;
w->done(w, err);
}
}

Expand All @@ -190,15 +200,18 @@ static void uv__queue_work(struct uv__work* w) {


static void uv__queue_done(struct uv__work* w, int status) {
uv_work_t* req = container_of(w, uv_work_t, work_req);
uv_work_t* req;

req = container_of(w, uv_work_t, work_req);
uv__req_unregister(req->loop, req);

if (status != 0)
if (req->after_work_cb == NULL)
return;

if (req->after_work_cb)
req->after_work_cb(req);
if (status == -UV_ECANCELED)
uv__set_artificial_error(req->loop, UV_ECANCELED);

req->after_work_cb(req, status ? -1 : 0);
}


Expand Down
2 changes: 1 addition & 1 deletion src/win/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ int uv_cancel(uv_req_t* req) {
void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) {
uv__req_unregister(loop, req);
if(req->after_work_cb)
req->after_work_cb(req);
req->after_work_cb(req, 0);
}
110 changes: 75 additions & 35 deletions test/test-threadpool-cancel.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ static uv_cond_t signal_cond;
static uv_mutex_t signal_mutex;
static uv_mutex_t wait_mutex;
static unsigned num_threads;
static unsigned fs_cb_called;
static unsigned work_cb_called;
static unsigned done_cb_called;
static unsigned done2_cb_called;
static unsigned timer_cb_called;
static unsigned getaddrinfo_cb_called;


static void work_cb(uv_work_t* req) {
Expand All @@ -52,10 +56,12 @@ static void work_cb(uv_work_t* req) {

uv_mutex_lock(&wait_mutex);
uv_mutex_unlock(&wait_mutex);

work_cb_called++;
}


static void done_cb(uv_work_t* req) {
static void done_cb(uv_work_t* req, int status) {
done_cb_called++;
free(req);
}
Expand All @@ -82,7 +88,6 @@ static void saturate_threadpool(void) {
*/
if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) {
ASSERT(0 == uv_cancel((uv_req_t*) req));
free(req);
break;
}
}
Expand All @@ -96,15 +101,40 @@ static void unblock_threadpool(void) {


static void cleanup_threadpool(void) {
ASSERT(done_cb_called == num_threads);
ASSERT(done_cb_called == num_threads + 1); /* +1 == cancelled work req. */
ASSERT(work_cb_called == num_threads);

uv_cond_destroy(&signal_cond);
uv_mutex_destroy(&signal_mutex);
uv_mutex_destroy(&wait_mutex);
}


static void fail_cb(/* empty */) {
ASSERT(0 && "fail_cb called");
static void fs_cb(uv_fs_t* req) {
ASSERT(req->errorno == UV_ECANCELED);
uv_fs_req_cleanup(req);
fs_cb_called++;
}


static void getaddrinfo_cb(uv_getaddrinfo_t* req,
int status,
struct addrinfo* res) {
ASSERT(UV_ECANCELED == uv_last_error(req->loop).code);
ASSERT(UV_ECANCELED == status);
getaddrinfo_cb_called++;
}


static void work2_cb(uv_work_t* req) {
ASSERT(0 && "work2_cb called");
}


static void done2_cb(uv_work_t* req, int status) {
ASSERT(uv_last_error(req->loop).code == UV_ECANCELED);
ASSERT(status == -1);
done2_cb_called++;
}


Expand All @@ -131,15 +161,23 @@ TEST_IMPL(threadpool_cancel_getaddrinfo) {
struct cancel_info ci;
struct addrinfo hints;
uv_loop_t* loop;
int r;

INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();

ASSERT(0 == uv_getaddrinfo(loop, reqs + 0, fail_cb, "fail", NULL, NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 1, fail_cb, NULL, "fail", NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 2, fail_cb, "fail", "fail", NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 3, fail_cb, "fail", NULL, &hints));
r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL);
ASSERT(r == 0);

r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL);
ASSERT(r == 0);

r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL);
ASSERT(r == 0);

r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints);
ASSERT(r == 0);

ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
Expand All @@ -163,12 +201,13 @@ TEST_IMPL(threadpool_cancel_work) {
saturate_threadpool();

for (i = 0; i < ARRAY_SIZE(reqs); i++)
ASSERT(0 == uv_queue_work(loop, reqs + i, fail_cb, NULL));
ASSERT(0 == uv_queue_work(loop, reqs + i, work2_cb, done2_cb));

ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(1 == timer_cb_called);
ASSERT(ARRAY_SIZE(reqs) == done2_cb_called);

cleanup_threadpool();

Expand All @@ -188,36 +227,37 @@ TEST_IMPL(threadpool_cancel_fs) {

/* Needs to match ARRAY_SIZE(fs_reqs). */
n = 0;
ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fail_cb));
ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fail_cb));
ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fail_cb));
ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fail_cb));
ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fail_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fail_cb));
ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fail_cb));
ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb));
ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb));
ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fs_cb));
ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fs_cb));
ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb));
ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fs_cb));
ASSERT(n == ARRAY_SIZE(reqs));

ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(n == fs_cb_called);
ASSERT(1 == timer_cb_called);

cleanup_threadpool();
Expand Down
3 changes: 2 additions & 1 deletion test/test-threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ static void work_cb(uv_work_t* req) {
}


static void after_work_cb(uv_work_t* req) {
static void after_work_cb(uv_work_t* req, int status) {
ASSERT(status == 0);
ASSERT(req == &work_req);
ASSERT(req->data == &data);
after_work_cb_count++;
Expand Down

2 comments on commit 92fb84b

@isaacs
Copy link

@isaacs isaacs commented on 92fb84b Dec 13, 2012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the signature of uv_work is going to break a bunch of addons. Is there any way to make the status param to the after work cb optional?

@bnoordhuis
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The short answer is 'no'. The longer answer is that it was an omission in the API to start with and the alternatives are worse:

  1. Add a status field to uv_work_t. Makes it unlike any other libuv API except uv_fs_t (which itself is a blemish).
  2. Compiler or preprocessor trickery like int uv_queue_work(uv_loop_t*, uv_work_t*, uv_work_cb, ...) or #define uv_queue_work(a, b, c, d) uv_queue_work(a, b, c, (uv_after_work_cb) d). Brittle, doesn't work for overloaded functions.
  3. Symbol versioning. Pure brain damage and compiler/linker specific to boot.

Please sign in to comment.