Skip to content

Commit

Permalink
WIP add uv_work_task()
Browse files Browse the repository at this point in the history
Proof-of-concept to demonstrate the API of being able to queue work from
any thread, without needing to be attached to a specific event loop.

There's a problem when running the test threadpool_task. It shows
uv_library_shutdown() running even though it's never called. Which is
causing it to abort. Here's the stack:

    * thread #1, name = 'uv_run_tests_a', stop reason = signal SIGABRT
        frame libuv#6: 0x00000000004ce768 uv_run_tests_a`uv__threadpool_cleanup at threadpool.c:192:3
        frame libuv#7: 0x00000000004d5d56 uv_run_tests_a`uv_library_shutdown at uv-common.c:956:3
        frame libuv#8: 0x00007ffff7fc924e
        frame libuv#9: 0x00007ffff7c45495 libc.so.6`__run_exit_handlers(status=0, listp=<unavailable>, run_list_atexit=true, run_dtors=true) at exit.c:113:8

Can't figure out why this is happening.

Another problem is when you remove the uv_sleep() from task_cb() it'll
cause another abort when trying to lock the mutex. Here's the stack:

        frame libuv#8: 0x00000000004ed375 uv_run_tests_a`uv_mutex_lock(mutex=0x0000000001460c50) at thread.c:344:7
        frame libuv#9: 0x00000000004cebdf uv_run_tests_a`uv__queue_work(w=0x0000000001462378) at threadpool.c:342:5
    (lldb) f 9
    frame libuv#9: 0x00000000004cebdf uv_run_tests_a`uv__queue_work(w=0x0000000001462378) at threadpool.c:342:5
       341    if (w->done == NULL) {
    -> 342      uv_mutex_lock(&mutex);
       343      w->work = NULL;

Still not sure if this is a race condition.
  • Loading branch information
trevnorris committed Mar 31, 2023
1 parent 4731fec commit 6d687d7
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 4 deletions.
4 changes: 4 additions & 0 deletions include/uv.h
Expand Up @@ -1100,6 +1100,10 @@ UV_EXTERN int uv_queue_work(uv_loop_t* loop,
uv_work_t* req,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb);
UV_EXTERN int uv_work_task(uv_work_t* req, uv_work_cb work_cb);
UV_EXTERN int uv_work_done(uv_loop_t* loop,
uv_work_t* req,
uv_after_work_cb after_work_cb);

UV_EXTERN int uv_cancel(uv_req_t* req);

Expand Down
51 changes: 47 additions & 4 deletions src/threadpool.c
Expand Up @@ -121,6 +121,11 @@ static void worker(void* arg) {
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);

/* If no event loop is attached to this work then there's no need to
* continue. */
if (w->loop == NULL)
continue;

uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
Expand Down Expand Up @@ -327,10 +332,16 @@ void uv__work_done(uv_async_t* handle) {
static void uv__queue_work(struct uv__work* w) {
uv_work_t* req = container_of(w, uv_work_t, work_req);

/* TODO: Referring back to "threadpool: skip done callback if after_cb is NULL",
* what if we detected if done == NULL before calling work_cb() and removed
* it from the queue here. That would allow the user to free() the req from
* the work_cb. */
/* Remove the req from the queue if it was supplied using uv_work_task. */
/* TODO(trevnorris): If uv_cancel() is called between when this function was
* called and this lock, then the it'll still run work_cb(). Should we add a
* check once the lock is acquired to see if that's happened? */
if (w->done == NULL) {
uv_mutex_lock(&mutex);
w->work = NULL;
QUEUE_REMOVE(&w->wq);
uv_mutex_unlock(&mutex);
}

req->work_cb(req);
}
Expand Down Expand Up @@ -369,6 +380,38 @@ int uv_queue_work(uv_loop_t* loop,
}


int uv_work_task(uv_work_t* req, uv_work_cb work_cb) {
if (work_cb == NULL)
return UV_EINVAL;

/* Don't use uv__req_init() since we're not increasing the active req count
* on any given uv_loop_t. */
UV_REQ_INIT(req, UV_WORK);
req->work_cb = work_cb;
/* Queue'ing without uv__queue_done will cause the req to be removed from the
* queue before calling the uv_work_cb. */
uv__work_submit(NULL,
&req->work_req,
UV__WORK_CPU,
uv__queue_work,
NULL);

return 0;
}


int uv_work_done(uv_loop_t* loop,
uv_work_t* req,
uv_after_work_cb after_work_cb) {
if (after_work_cb == NULL)
return UV_EINVAL;

/* TODO(trevnorris): Finish. */

return 0;
}


int uv_cancel(uv_req_t* req) {
struct uv__work* wreq;
uv_loop_t* loop;
Expand Down
2 changes: 2 additions & 0 deletions test/test-list.h
Expand Up @@ -445,6 +445,7 @@ TEST_DECLARE (strtok)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_queue_recursive)
TEST_DECLARE (threadpool_task)
TEST_DECLARE (threadpool_multiple_event_loops)
TEST_DECLARE (threadpool_cancel_getaddrinfo)
TEST_DECLARE (threadpool_cancel_getnameinfo)
Expand Down Expand Up @@ -1130,6 +1131,7 @@ TASK_LIST_START
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY (threadpool_queue_recursive)
TEST_ENTRY (threadpool_task)
TEST_ENTRY_CUSTOM (threadpool_multiple_event_loops, 0, 0, 60000)
TEST_ENTRY (threadpool_cancel_getaddrinfo)
TEST_ENTRY (threadpool_cancel_getnameinfo)
Expand Down
29 changes: 29 additions & 0 deletions test/test-threadpool.c
Expand Up @@ -28,6 +28,7 @@ static int work_cb_count;
static int work_cb_count2;
static int after_work_cb_count;
static uv_work_t work_req;
static uv_sem_t task_sem;
static char data;


Expand Down Expand Up @@ -142,3 +143,31 @@ TEST_IMPL(threadpool_queue_recursive) {
MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}


static void task_cb(uv_work_t* req) {
/* TODO(trevnorris): This is to get around a race condition. */
uv_sleep(1);
if (++work_cb_count < RECURS_SIZE) {
ASSERT_EQ(uv_work_task(req, task_cb), 0);
return;
}
uv_sem_post(&task_sem);
free(req);
}


TEST_IMPL(threadpool_task) {
uv_work_t* req = malloc(sizeof(*req));

ASSERT_EQ(uv_sem_init(&task_sem, 0), 0);
ASSERT_EQ(uv_work_task(req, task_cb), 0);

uv_sem_wait(&task_sem);

ASSERT_EQ(work_cb_count, RECURS_SIZE);

uv_sem_destroy(&task_sem);

return 0;
}

0 comments on commit 6d687d7

Please sign in to comment.