Skip to content

Commit

Permalink
WIP add uv_work_task()
Browse files Browse the repository at this point in the history
Proof-of-concept to demonstrate the API of being able to queue work from
any thread, without needing to be attached to a specific event loop.

There's a problem when running the test threadpool_task. It shows
uv_library_shutdown() running even though it's never called. Which is
causing it to abort. Here's the stack:

    * thread #1, name = 'uv_run_tests_a', stop reason = signal SIGABRT
        frame libuv#6: 0x00000000004ce768 uv_run_tests_a`uv__threadpool_cleanup at threadpool.c:192:3
        frame libuv#7: 0x00000000004d5d56 uv_run_tests_a`uv_library_shutdown at uv-common.c:956:3
        frame libuv#8: 0x00007ffff7fc924e
        frame libuv#9: 0x00007ffff7c45495 libc.so.6`__run_exit_handlers(status=0, listp=<unavailable>, run_list_atexit=true, run_dtors=true) at exit.c:113:8

Can't figure out why this is happening.

Another problem is when you remove the uv_sleep() from task_cb() it'll
cause another abort when trying to lock the mutex. Here's the stack:

        frame libuv#8: 0x00000000004ed375 uv_run_tests_a`uv_mutex_lock(mutex=0x0000000001460c50) at thread.c:344:7
        frame libuv#9: 0x00000000004cebdf uv_run_tests_a`uv__queue_work(w=0x0000000001462378) at threadpool.c:342:5
    (lldb) f 9
    frame libuv#9: 0x00000000004cebdf uv_run_tests_a`uv__queue_work(w=0x0000000001462378) at threadpool.c:342:5
       341    if (w->done == NULL) {
    -> 342      uv_mutex_lock(&mutex);
       343      w->work = NULL;

Still not sure if this is a race condition.
  • Loading branch information
trevnorris committed Mar 31, 2023
1 parent 4731fec commit b111005
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 19 deletions.
4 changes: 4 additions & 0 deletions include/uv.h
Expand Up @@ -1100,6 +1100,10 @@ UV_EXTERN int uv_queue_work(uv_loop_t* loop,
uv_work_t* req,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb);
UV_EXTERN int uv_work_task(uv_work_t* req, uv_work_cb work_cb);
UV_EXTERN int uv_work_done(uv_loop_t* loop,
uv_work_t* req,
uv_after_work_cb after_work_cb);

UV_EXTERN int uv_cancel(uv_req_t* req);

Expand Down
2 changes: 1 addition & 1 deletion include/uv/threadpool.h
Expand Up @@ -28,7 +28,7 @@
#define UV_THREADPOOL_H_

struct uv__work {
void (*work)(struct uv__work *w);
int (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
void* wq[2];
Expand Down
4 changes: 3 additions & 1 deletion src/random.c
Expand Up @@ -70,11 +70,13 @@ static int uv__random(void* buf, size_t buflen) {
}


static void uv__random_work(struct uv__work* w) {
static int uv__random_work(struct uv__work* w) {
uv_random_t* req;

req = container_of(w, uv_random_t, work_req);
req->status = uv__random(req->buf, req->buflen);

return 0;
}


Expand Down
63 changes: 55 additions & 8 deletions src/threadpool.c
Expand Up @@ -46,8 +46,9 @@ static unsigned int slow_work_thread_threshold(void) {
return (nthreads + 1) / 2;
}

static void uv__cancelled(struct uv__work* w) {
static int uv__cancelled(struct uv__work* w) {
abort();
return 0;
}


Expand Down Expand Up @@ -119,7 +120,12 @@ static void worker(void* arg) {
uv_mutex_unlock(&mutex);

w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);

/* uv_work_task() will return > 0 which means there's nothing else to do.
* Don't need to worry about the is_slow_work check below since it's always
* false in the case of uv_work_task(). */
if (w->work(w))
continue;

uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
Expand Down Expand Up @@ -265,7 +271,7 @@ static void init_once(void) {
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
void (*work)(struct uv__work* w),
int (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
Expand Down Expand Up @@ -324,15 +330,24 @@ void uv__work_done(uv_async_t* handle) {
}


static void uv__queue_work(struct uv__work* w) {
static int uv__queue_work(struct uv__work* w) {
uv_work_t* req = container_of(w, uv_work_t, work_req);
int r = 0;

/* TODO: Referring back to "threadpool: skip done callback if after_cb is NULL",
* what if we detected if done == NULL before calling work_cb() and removed
* it from the queue here. That would allow the user to free() the req from
* the work_cb. */
/* Remove the req from the queue if it was supplied using uv_work_task. */
/* TODO(trevnorris): If uv_cancel() is called between when this function was
* called and this lock, then the it'll still run work_cb(). Should we add a
* check once the lock is acquired to see if that's happened? */
if (w->done == NULL) {
uv_mutex_lock(&mutex);
w->work = NULL;
QUEUE_REMOVE(&w->wq);
uv_mutex_unlock(&mutex);
r = 1;
}

req->work_cb(req);
return r;
}


Expand Down Expand Up @@ -369,6 +384,38 @@ int uv_queue_work(uv_loop_t* loop,
}


int uv_work_task(uv_work_t* req, uv_work_cb work_cb) {
if (work_cb == NULL)
return UV_EINVAL;

/* Don't use uv__req_init() since we're not increasing the active req count
* on any given uv_loop_t. */
UV_REQ_INIT(req, UV_WORK);
req->work_cb = work_cb;
/* Queue'ing without uv__queue_done will cause the req to be removed from the
* queue before calling the uv_work_cb. */
uv__work_submit(NULL,
&req->work_req,
UV__WORK_CPU,
uv__queue_work,
NULL);

return 0;
}


int uv_work_done(uv_loop_t* loop,
uv_work_t* req,
uv_after_work_cb after_work_cb) {
if (after_work_cb == NULL)
return UV_EINVAL;

/* TODO(trevnorris): Finish. */

return 0;
}


int uv_cancel(uv_req_t* req) {
struct uv__work* wreq;
uv_loop_t* loop;
Expand Down
4 changes: 3 additions & 1 deletion src/unix/fs.c
Expand Up @@ -1710,7 +1710,7 @@ static ssize_t uv__fs_write_all(uv_fs_t* req) {
}


static void uv__fs_work(struct uv__work* w) {
static int uv__fs_work(struct uv__work* w) {
int retry_on_eintr;
uv_fs_t* req;
ssize_t r;
Expand Down Expand Up @@ -1779,6 +1779,8 @@ static void uv__fs_work(struct uv__work* w) {
req->fs_type == UV_FS_LSTAT)) {
req->ptr = &req->statbuf;
}

return 0;
}


Expand Down
4 changes: 3 additions & 1 deletion src/unix/getaddrinfo.c
Expand Up @@ -95,13 +95,15 @@ int uv__getaddrinfo_translate_error(int sys_err) {
}


static void uv__getaddrinfo_work(struct uv__work* w) {
static int uv__getaddrinfo_work(struct uv__work* w) {
uv_getaddrinfo_t* req;
int err;

req = container_of(w, uv_getaddrinfo_t, work_req);
err = getaddrinfo(req->hostname, req->service, req->hints, &req->addrinfo);
req->retcode = uv__getaddrinfo_translate_error(err);

return 0;
}


Expand Down
4 changes: 3 additions & 1 deletion src/unix/getnameinfo.c
Expand Up @@ -28,7 +28,7 @@
#include "internal.h"


static void uv__getnameinfo_work(struct uv__work* w) {
static int uv__getnameinfo_work(struct uv__work* w) {
uv_getnameinfo_t* req;
int err;
socklen_t salen;
Expand All @@ -50,6 +50,8 @@ static void uv__getnameinfo_work(struct uv__work* w) {
sizeof(req->service),
req->flags);
req->retcode = uv__getaddrinfo_translate_error(err);

return 0;
}

static void uv__getnameinfo_done(struct uv__work* w, int status) {
Expand Down
2 changes: 1 addition & 1 deletion src/uv-common.h
Expand Up @@ -206,7 +206,7 @@ enum uv__work_kind {
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
enum uv__work_kind kind,
void (*work)(struct uv__work *w),
int (*work)(struct uv__work *w),
void (*done)(struct uv__work *w, int status));

void uv__work_done(uv_async_t* handle);
Expand Down
4 changes: 3 additions & 1 deletion src/win/fs.c
Expand Up @@ -2850,7 +2850,7 @@ static void fs__statfs(uv_fs_t* req) {
}


static void uv__fs_work(struct uv__work* w) {
static int uv__fs_work(struct uv__work* w) {
uv_fs_t* req;

req = container_of(w, uv_fs_t, work_req);
Expand Down Expand Up @@ -2897,6 +2897,8 @@ static void uv__fs_work(struct uv__work* w) {
default:
assert(!"bad uv_fs_type");
}

return 0;
}


Expand Down
4 changes: 3 additions & 1 deletion src/win/getaddrinfo.c
Expand Up @@ -80,7 +80,7 @@ int uv__getaddrinfo_translate_error(int sys_err) {
#define NDIS_IF_MAX_STRING_SIZE IF_MAX_STRING_SIZE
#endif

static void uv__getaddrinfo_work(struct uv__work* w) {
static int uv__getaddrinfo_work(struct uv__work* w) {
uv_getaddrinfo_t* req;
struct addrinfoW* hints;
int err;
Expand All @@ -90,6 +90,8 @@ static void uv__getaddrinfo_work(struct uv__work* w) {
req->addrinfow = NULL;
err = GetAddrInfoW(req->node, req->service, hints, &req->addrinfow);
req->retcode = uv__getaddrinfo_translate_error(err);

return 0;
}


Expand Down
7 changes: 4 additions & 3 deletions src/win/getnameinfo.c
Expand Up @@ -38,7 +38,7 @@ int WSAAPI GetNameInfoW(
);
#endif

static void uv__getnameinfo_work(struct uv__work* w) {
static int uv__getnameinfo_work(struct uv__work* w) {
uv_getnameinfo_t* req;
WCHAR host[NI_MAXHOST];
WCHAR service[NI_MAXSERV];
Expand All @@ -54,7 +54,7 @@ static void uv__getnameinfo_work(struct uv__work* w) {
req->flags)) {
ret = WSAGetLastError();
req->retcode = uv__getaddrinfo_translate_error(ret);
return;
return 0;
}

ret = WideCharToMultiByte(CP_UTF8,
Expand All @@ -67,7 +67,7 @@ static void uv__getnameinfo_work(struct uv__work* w) {
NULL);
if (ret == 0) {
req->retcode = uv_translate_sys_error(GetLastError());
return;
return 0;
}

ret = WideCharToMultiByte(CP_UTF8,
Expand All @@ -81,6 +81,7 @@ static void uv__getnameinfo_work(struct uv__work* w) {
if (ret == 0) {
req->retcode = uv_translate_sys_error(GetLastError());
}
return 0;
}


Expand Down
2 changes: 2 additions & 0 deletions test/test-list.h
Expand Up @@ -445,6 +445,7 @@ TEST_DECLARE (strtok)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_queue_recursive)
TEST_DECLARE (threadpool_task)
TEST_DECLARE (threadpool_multiple_event_loops)
TEST_DECLARE (threadpool_cancel_getaddrinfo)
TEST_DECLARE (threadpool_cancel_getnameinfo)
Expand Down Expand Up @@ -1130,6 +1131,7 @@ TASK_LIST_START
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY (threadpool_queue_recursive)
TEST_ENTRY (threadpool_task)
TEST_ENTRY_CUSTOM (threadpool_multiple_event_loops, 0, 0, 60000)
TEST_ENTRY (threadpool_cancel_getaddrinfo)
TEST_ENTRY (threadpool_cancel_getnameinfo)
Expand Down
29 changes: 29 additions & 0 deletions test/test-threadpool.c
Expand Up @@ -28,6 +28,7 @@ static int work_cb_count;
static int work_cb_count2;
static int after_work_cb_count;
static uv_work_t work_req;
static uv_sem_t task_sem;
static char data;


Expand Down Expand Up @@ -142,3 +143,31 @@ TEST_IMPL(threadpool_queue_recursive) {
MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}


static void task_cb(uv_work_t* req) {
/* TODO(trevnorris): This is to get around a race condition. */
uv_sleep(1);
if (++work_cb_count < RECURS_SIZE) {
ASSERT_EQ(uv_work_task(req, task_cb), 0);
return;
}
uv_sem_post(&task_sem);
free(req);
}


TEST_IMPL(threadpool_task) {
uv_work_t* req = malloc(sizeof(*req));

ASSERT_EQ(uv_sem_init(&task_sem, 0), 0);
ASSERT_EQ(uv_work_task(req, task_cb), 0);

uv_sem_wait(&task_sem);

ASSERT_EQ(work_cb_count, RECURS_SIZE);

uv_sem_destroy(&task_sem);

return 0;
}

0 comments on commit b111005

Please sign in to comment.