Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic threadpool RFC #382

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 121 additions & 63 deletions src/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ static void uv__req_init(uv_loop_t* loop,
uv__req_init((loop), (uv_req_t*)(req), (type))
#endif

#include <limits.h>
#include <stdlib.h>

#define MAX_THREADPOOL_SIZE 128
#define IDLE_THREAD_TIMEOUT 5e9 /* 5 seconds in nanoseconds. */
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary timeout. Maybe this should be configurable with an option for 'never'.


static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_mutex_t mutex;
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
static int busy_threads;
static int num_threads;
static int max_threads;
static QUEUE exit_message;
static QUEUE wq;
static volatile int initialized;
Expand All @@ -57,104 +59,132 @@ static void uv__cancelled(struct uv__work* w) {
}


/* Only call when you hold NO locks. */
static void do_work(struct uv__work* w) {
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to avoid taking out the lock here when running in synchronous (i.e. zero threads) mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the uv_mutex_unlock() be moved above the uv_async_send()? Thought that call was thread friendly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably can, but... I wrote it this way because otherwise it's possible for the per-loop work queue to get drained between the calls to uv_mutex_unlock() and uv_async_send(), and that would wake up the event loop when there is no work available.

It's probably no big deal but I figured: why make things more complex than they already are?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't consider the interaction byproduct of the tail insert and waking up the event loop. Need to go fix some of my own code now.

}


/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
static void worker(void* arg) {
struct uv__work* w;
#ifdef _WIN32
static unsigned __stdcall worker(void* arg) {
#else
static void* worker(void* arg) {
#endif
QUEUE* q;

(void) arg;
(void) &arg;
uv_mutex_lock(&mutex);

for (;;) {
uv_mutex_lock(&mutex);
if (QUEUE_EMPTY(&wq))
uv_cond_timedwait(&cond, &mutex, IDLE_THREAD_TIMEOUT);

while (QUEUE_EMPTY(&wq))
uv_cond_wait(&cond, &mutex);
if (QUEUE_EMPTY(&wq))
break; /* Timed out, nothing to do, exit thread. */

q = QUEUE_HEAD(&wq);

if (q == &exit_message)
uv_cond_signal(&cond);
else {
QUEUE_REMOVE(q);
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
executing. */
if (q == &exit_message) {
uv_cond_signal(&cond); /* Propagate exit message to next thread. */
break;
}

uv_mutex_unlock(&mutex);
QUEUE_REMOVE(q);
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */

if (q == &exit_message)
break;
busy_threads += 1;
uv_mutex_unlock(&mutex);
do_work(QUEUE_DATA(q, struct uv__work, wq));
uv_mutex_lock(&mutex);
busy_threads -= 1;
}

w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
assert(num_threads > INT_MIN); /* Extremely paranoid sanity check. */
num_threads -= 1;
uv_mutex_unlock(&mutex);

uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
}
return 0;
}


static void post(QUEUE* q) {
uv_mutex_lock(&mutex);
QUEUE_INSERT_TAIL(&wq, q);
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
static int new_detached_thread(void) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should uv_thread_self work if called from a worker thread? We use a TLS key for that, which wouldn't be setup in this case.

Maybe we can use uv_thread_create and fiddle with the internals here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize the TLS was used for that. There is one reason (well, two) why I implemented it like this:

  1. uv_thread_create() on Windows creates the thread in suspended mode, then calls ResumeThread(). I don't quite understand why it does that so I decided not to mess with it.
  2. uv_thread_create() calls malloc() and free() from different threads, which might be slow. For example, glibc has to acquire a process-global lock when the free happens on a different thread.

We can just try it and see how well it works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on Windows creates the thread in suspended mode, then calls ResumeThread(). I don't quite understand why it does that so I decided not to mess with it.

It does it so it can store the thread handle as the current thread in TLS.

We can just try it and see how well it works.

Sounds reasonable.

#ifdef _WIN32
HANDLE thread;

thread = (HANDLE) _beginthreadex(NULL, 0, worker, NULL, 0, NULL);
if (thread == NULL)
return -errno;

if (CloseHandle(thread))
abort();

return 0;
#else
pthread_t thread;
int err;

err = pthread_create(&thread, NULL, worker, NULL);
if (err)
return -err;

if (pthread_detach(thread))
abort();

return 0;
#endif
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed if/when libuv grows a uv_thread_detach() function. I can do that in a follow-up PR.



#ifndef _WIN32
UV_DESTRUCTOR(static void cleanup(void)) {
unsigned int i;

if (initialized == 0)
return;

post(&exit_message);
uv_mutex_lock(&mutex);

for (i = 0; i < nthreads; i++)
if (uv_thread_join(threads + i))
abort();
/* Tell threads to terminate. */
if (num_threads > 0) {
QUEUE_INSERT_TAIL(&wq, &exit_message);
uv_cond_signal(&cond);
}

if (threads != default_threads)
uv__free(threads);
/* Wait for threads to terminate. */
while (num_threads > 0) {
uv_cond_wait(&cond, &mutex);
uv_cond_signal(&cond);
}

uv_mutex_unlock(&mutex);
uv_mutex_destroy(&mutex);
uv_cond_destroy(&cond);

threads = NULL;
nthreads = 0;
initialized = 0;
}
#endif


static void init_once(void) {
unsigned int i;
const char* val;

nthreads = ARRAY_SIZE(default_threads);
max_threads = 8; /* FIXME(bnoordhuis) Arbitrary default. */
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary default limit. Suggestions on how to pick a better one at run-time are welcome.

max_threads = sysconf(_SC_NPROCESSORS_ONLN) or maybe factor in the system load?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean it would re-calculate max_threads based on the system load every time this runs, or just at startup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code only runs once but I get your point. Yes, libuv would periodically query the system load and adjust max_threads accordingly. I haven't really thought out the details yet, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this threadpool is used for blocking i/o, I guess we could start with some more threads than the processor count.

As for re-calculating the manimum amount, I think I'd rather give the user the ability to do it. They can decide when and to how much to resize.

val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nthreads = atoi(val);
if (nthreads == 0)
nthreads = 1;
if (nthreads > MAX_THREADPOOL_SIZE)
nthreads = MAX_THREADPOOL_SIZE;

threads = default_threads;
if (nthreads > ARRAY_SIZE(default_threads)) {
threads = uv__malloc(nthreads * sizeof(threads[0]));
if (threads == NULL) {
nthreads = ARRAY_SIZE(default_threads);
threads = default_threads;
}
}
max_threads = atoi(val);
if (max_threads < 1)
max_threads = 1;
if (max_threads > MAX_THREADPOOL_SIZE)
max_threads = MAX_THREADPOOL_SIZE;
/* Allow forcing synchronous mode for testing purposes. */
if (val != NULL && strcmp(val, "really-zero") == 0)
max_threads = 0;

if (uv_cond_init(&cond))
abort();
Expand All @@ -164,10 +194,6 @@ static void init_once(void) {

QUEUE_INIT(&wq);

for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, NULL))
abort();

initialized = 1;
}

Expand All @@ -176,11 +202,43 @@ void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
int err;

uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq);

uv_mutex_lock(&mutex);

if (busy_threads >= num_threads && num_threads < max_threads) {
/* Drop the lock for a moment, starting a thread can be slow. */
uv_mutex_unlock(&mutex);
err = new_detached_thread();
uv_mutex_lock(&mutex);

/* There is technically a race window between new_detached_thread() where
* the new thread exits and decrements num_threads before we increment it
* here. It's a benign race, though; worst case, num_threads is less than
* zero for a short time. We just take that in stride, the important part
* is that num_threads > 0 means there is a thread to service the work item.
*/
if (err == 0)
num_threads += 1;
}

if (num_threads > 0) {
QUEUE_INSERT_TAIL(&wq, &w->wq);
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
} else {
/* If we can't start a thread, fall back to doing the work synchronously.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I like this. I guess chances of it happening are low, but the user would never know. How about we return an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error means all call sites need to be updated to handle errors.

The hairy part is cleanup. There will be memory leaks all over the place unless fs.c, getaddrinfo.c and getnameinfo.c go through extensive rewrites.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, I haven't looked deeply into it, but how would the error handling (from the user's prespective) be different than when a sync operation fails? The user still needs to call uv_fs_cleanup which will do the needful, right?

* Performance will be degraded but at least we make forward progress.
*/
QUEUE_INIT(&w->wq);
uv_mutex_unlock(&mutex);
do_work(w);
}
}


Expand Down
29 changes: 29 additions & 0 deletions test/test-threadpool-cancel.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include "uv.h"
#include "task.h"

#include <stdlib.h>
#include <string.h>

#define INIT_CANCEL_INFO(ci, what) \
do { \
(ci)->reqs = (what); \
Expand All @@ -48,6 +51,12 @@ static unsigned done2_cb_called;
static unsigned timer_cb_called;


static int no_threadpool(void) {
const char* val = getenv("UV_THREADPOOL_SIZE");
return val != NULL && strcmp(val, "really-zero") == 0;
}


static void work_cb(uv_work_t* req) {
uv_mutex_lock(&signal_mutex);
uv_cond_signal(&signal_cond);
Expand Down Expand Up @@ -182,6 +191,10 @@ TEST_IMPL(threadpool_cancel_getaddrinfo) {
uv_loop_t* loop;
int r;

/* Work requests are executed synchronously when there is no threadpool. */
if (no_threadpool())
RETURN_SKIP("Cancellation doesn't work in synchronous work mode.");

INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
Expand Down Expand Up @@ -217,6 +230,10 @@ TEST_IMPL(threadpool_cancel_getnameinfo) {
uv_loop_t* loop;
int r;

/* Work requests are executed synchronously when there is no threadpool. */
if (no_threadpool())
RETURN_SKIP("Cancellation doesn't work in synchronous work mode.");

r = uv_ip4_addr("127.0.0.1", 80, &addr4);
ASSERT(r == 0);

Expand Down Expand Up @@ -254,6 +271,10 @@ TEST_IMPL(threadpool_cancel_work) {
uv_loop_t* loop;
unsigned i;

/* Work requests are executed synchronously when there is no threadpool. */
if (no_threadpool())
RETURN_SKIP("Cancellation doesn't work in synchronous work mode.");

INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
Expand All @@ -280,6 +301,10 @@ TEST_IMPL(threadpool_cancel_fs) {
uv_loop_t* loop;
unsigned n;

/* Work requests are executed synchronously when there is no threadpool. */
if (no_threadpool())
RETURN_SKIP("Cancellation doesn't work in synchronous work mode.");

INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
Expand Down Expand Up @@ -332,6 +357,10 @@ TEST_IMPL(threadpool_cancel_single) {
int cancelled;
int i;

/* Work requests are executed synchronously when there is no threadpool. */
if (no_threadpool())
RETURN_SKIP("Cancellation doesn't work in synchronous work mode.");

loop = uv_default_loop();
for (i = 0; i < 5000; i++) {
req.data = NULL;
Expand Down