Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
unix: add uv_cancel()
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoordhuis committed Dec 9, 2012
1 parent a385ae4 commit 52c8a86
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 9 deletions.
2 changes: 1 addition & 1 deletion include/uv-private/uv-unix.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct uv__io_s {

struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
ngx_queue_t wq;
};
Expand Down
13 changes: 13 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,19 @@ struct uv_work_s {
UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req,
uv_work_cb work_cb, uv_after_work_cb after_work_cb);

/* Cancel a pending request. Fails if the request is executing or has finished
* executing.
*
* Returns 0 on success, -1 on error. The loop error code is not touched.
*
* Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is
* currently supported.
*
* This function is currently only implemented on UNIX platforms. On Windows,
* it always returns -1.
*/
UV_EXTERN int uv_cancel(uv_req_t* req);


struct uv_cpu_info_s {
char* model;
Expand Down
9 changes: 7 additions & 2 deletions src/unix/fs.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
} \
else { \
uv__fs_work(&(req)->work_req); \
uv__fs_done(&(req)->work_req); \
uv__fs_done(&(req)->work_req, 0); \
return (req)->result; \
} \
} \
Expand Down Expand Up @@ -516,12 +516,17 @@ static void uv__fs_work(struct uv__work* w) {
}


static void uv__fs_done(struct uv__work* w) {
static void uv__fs_done(struct uv__work* w, int status) {
uv_fs_t* req;

req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);

if (status != 0) {
uv_fs_req_cleanup(req);
return;
}

if (req->errorno != 0) {
req->errorno = uv_translate_sys_error(req->errorno);
uv__set_artificial_error(req->loop, req->errorno);
Expand Down
9 changes: 8 additions & 1 deletion src/unix/getaddrinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static void uv__getaddrinfo_work(struct uv__work* w) {
}


static void uv__getaddrinfo_done(struct uv__work* w) {
static void uv__getaddrinfo_done(struct uv__work* w, int status) {
uv_getaddrinfo_t* req = container_of(w, uv_getaddrinfo_t, work_req);
struct addrinfo *res = req->res;
#if __sun
Expand All @@ -63,6 +63,13 @@ static void uv__getaddrinfo_done(struct uv__work* w) {
else
assert(0);

req->hints = NULL;
req->service = NULL;
req->hostname = NULL;

if (status != 0)
return;

if (req->retcode == 0) {
/* OK */
#if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */
Expand Down
2 changes: 1 addition & 1 deletion src/unix/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ void uv__signal_loop_cleanup();
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
void (*work)(struct uv__work *w),
void (*done)(struct uv__work *w));
void (*done)(struct uv__work *w, int status));
void uv__work_done(uv_async_t* handle, int status);

/* platform specific */
Expand Down
67 changes: 63 additions & 4 deletions src/unix/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ static ngx_queue_t wq;
static volatile int initialized;


/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
static void worker(void* arg) {
struct uv__work* w;
ngx_queue_t* q;
Expand All @@ -46,8 +49,11 @@ static void worker(void* arg) {

if (q == &exit_message)
uv_cond_signal(&cond);
else
else {
ngx_queue_remove(q);
ngx_queue_init(q); /* Signal uv_cancel() that the work req is
executing. */
}

uv_mutex_unlock(&mutex);

Expand All @@ -58,6 +64,8 @@ static void worker(void* arg) {
w->work(w);

uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
ngx_queue_insert_tail(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
Expand Down Expand Up @@ -116,7 +124,7 @@ static void cleanup(void) {
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w)) {
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
Expand All @@ -125,6 +133,29 @@ void uv__work_submit(uv_loop_t* loop,
}


int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
int cancelled;

uv_mutex_lock(&mutex);
uv_mutex_lock(&w->loop->wq_mutex);

cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL;
if (cancelled)
ngx_queue_remove(&w->wq);

uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_unlock(&mutex);

if (!cancelled)
return -1;

ngx_queue_init(&w->wq);
w->done(w, -UV_ECANCELED);

return 0;
}


void uv__work_done(uv_async_t* handle, int status) {
struct uv__work* w;
uv_loop_t* loop;
Expand All @@ -146,7 +177,7 @@ void uv__work_done(uv_async_t* handle, int status) {
ngx_queue_remove(q);

w = container_of(q, struct uv__work, wq);
w->done(w);
w->done(w, 0);
}
}

Expand All @@ -158,11 +189,14 @@ static void uv__queue_work(struct uv__work* w) {
}


static void uv__queue_done(struct uv__work* w) {
static void uv__queue_done(struct uv__work* w, int status) {
uv_work_t* req = container_of(w, uv_work_t, work_req);

uv__req_unregister(req->loop, req);

if (status != 0)
return;

if (req->after_work_cb)
req->after_work_cb(req);
}
Expand All @@ -182,3 +216,28 @@ int uv_queue_work(uv_loop_t* loop,
uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
return 0;
}


int uv_cancel(uv_req_t* req) {
struct uv__work* wreq;
uv_loop_t* loop;

switch (req->type) {
case UV_FS:
loop = ((uv_fs_t*) req)->loop;
wreq = &((uv_fs_t*) req)->work_req;
break;
case UV_GETADDRINFO:
loop = ((uv_getaddrinfo_t*) req)->loop;
wreq = &((uv_getaddrinfo_t*) req)->work_req;
break;
case UV_WORK:
loop = ((uv_work_t*) req)->loop;
wreq = &((uv_work_t*) req)->work_req;
break;
default:
return -1;
}

return uv__work_cancel(loop, req, wreq);
}
5 changes: 5 additions & 0 deletions src/win/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb,
}


int uv_cancel(uv_req_t* req) {
return -1;
}


void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) {
uv__req_unregister(loop, req);
if(req->after_work_cb)
Expand Down
6 changes: 6 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ TEST_DECLARE (fs_rename_to_existing_file)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_multiple_event_loops)
TEST_DECLARE (threadpool_cancel_getaddrinfo)
TEST_DECLARE (threadpool_cancel_work)
TEST_DECLARE (threadpool_cancel_fs)
TEST_DECLARE (thread_mutex)
TEST_DECLARE (thread_rwlock)
TEST_DECLARE (thread_create)
Expand Down Expand Up @@ -454,6 +457,9 @@ TASK_LIST_START
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY (threadpool_multiple_event_loops)
TEST_ENTRY (threadpool_cancel_getaddrinfo)
TEST_ENTRY (threadpool_cancel_work)
TEST_ENTRY (threadpool_cancel_fs)
TEST_ENTRY (thread_mutex)
TEST_ENTRY (thread_rwlock)
TEST_ENTRY (thread_create)
Expand Down
Loading

1 comment on commit 52c8a86

@ry
Copy link
Contributor

@ry ry commented on 52c8a86 Dec 10, 2012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sweet

Please sign in to comment.