Skip to content

Commit

Permalink
Merge branch 'for-v0.9.5-fix-max-threads-2' into for-v0.9.5-2
Browse files Browse the repository at this point in the history
  • Loading branch information
tmenjo committed Jan 24, 2017
2 parents 144ea5d + fc7d12d commit 28a2b12
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 42 deletions.
2 changes: 2 additions & 0 deletions include/util.h
Expand Up @@ -580,4 +580,6 @@ char *xstrdup(const char *s);
struct work_queue;
void register_util_wq(struct work_queue *wq);

uint32_t str_to_u32(const char *nptr);
uint16_t str_to_u16(const char *nptr);
#endif
4 changes: 3 additions & 1 deletion include/work.h
Expand Up @@ -24,7 +24,7 @@ struct work_queue {
enum wq_thread_control {
WQ_ORDERED, /* Only 1 thread created for work queue */
WQ_DYNAMIC, /* # of threads proportional to nr_nodes created */
WQ_UNLIMITED, /* Unlimited # of threads created */
WQ_FIXED, /* Fixed # of threads created */
};

static inline bool is_main_thread(void)
Expand Down Expand Up @@ -61,9 +61,11 @@ static inline bool is_worker_thread(void)
int init_work_queue(size_t (*get_nr_nodes)(void));
struct work_queue *create_work_queue(const char *name, enum wq_thread_control);
struct work_queue *create_ordered_work_queue(const char *name);
struct work_queue *create_fixed_work_queue(const char *name, int nr_threads);
void queue_work(struct work_queue *q, struct work *work);
bool work_queue_empty(struct work_queue *q);
int wq_trace_init(void);
void set_max_dynamic_threads(size_t nr_max);

#ifdef HAVE_TRACE
void suspend_worker_threads(void);
Expand Down
42 changes: 42 additions & 0 deletions lib/util.c
Expand Up @@ -962,3 +962,45 @@ char *xstrdup(const char *s)
return ret;
}

/*
* Convert a decimal string like as strtoll to uint32_t/uint16_t
*
* returns:
* - a converted value if success i.e. neither negative value nor overflow
* - undefined if something went wrong and set errno accordingly
*
* errno:
* - 0 if success
* - EINVAL if one of the following:
* - nptr was an empty string
* - there was an unconvertible character in nptr
* - ERANGE if negative/positive overflow occurred
*/
uint32_t str_to_u32(const char *nptr)
{
char *endptr;
errno = 0;
const long long conv = strtoll(nptr, &endptr, 10);
/* empty string or unconvertible character */
if (nptr == endptr || *endptr != '\0') {
errno = EINVAL;
return (uint32_t)conv;
}
/* negative value or overflow */
if (conv < 0LL || UINT32_MAX < conv) {
errno = ERANGE;
return UINT32_MAX;
}
return (uint32_t)conv;
}

uint16_t str_to_u16(const char *nptr)
{
const uint32_t conv = str_to_u32(nptr);
/* overflow */
if (UINT16_MAX < conv) {
errno = ERANGE;
return UINT16_MAX;
}
return (uint16_t)conv;
}
109 changes: 80 additions & 29 deletions lib/work.c
Expand Up @@ -33,6 +33,9 @@
#include "work.h"
#include "event.h"

/* If this is greater than 0, the number of threads grows based on it. */
static size_t max_dynamic_threads = 0;

/*
* The protection period from shrinking work queue. This is necessary
* to avoid many calls of pthread_create. Without it, threads are
Expand Down Expand Up @@ -68,6 +71,7 @@ static int efd;
static LIST_HEAD(wq_info_list);
static size_t nr_nodes = 1;
static size_t (*wq_get_nr_nodes)(void);
static size_t nr_cores = 1;

static void *worker_routine(void *arg);

Expand Down Expand Up @@ -202,28 +206,47 @@ static inline uint64_t wq_get_roof(struct wq_info *wi)
case WQ_ORDERED:
break;
case WQ_DYNAMIC:
/* FIXME: 2 * nr_nodes threads. No rationale yet. */
nr = nr_nodes * 2;
if (max_dynamic_threads > 0) {
nr = (uint64_t)max_dynamic_threads;
} else {
/* max(#nodes,#cores,16)*2 threads */
nr = (uint64_t)max(nr_nodes, nr_cores);
nr = max(nr, UINT64_C(16)) * 2;
}
break;
case WQ_UNLIMITED:
nr = SIZE_MAX;
case WQ_FIXED:
nr = wi->nr_threads;
break;
default:
panic("Invalid threads control %d", wi->tc);
}
return nr;
}

static bool wq_need_grow(struct wq_info *wi)
/*
* Return non-zero if a given workqueue need to grow.
* The return value is the new number of threads.
*
* Otherwise, return zero.
*/
static size_t wq_need_grow(struct wq_info *wi)
{
if (wi->nr_threads < uatomic_read(&wi->nr_queued_work) &&
wi->nr_threads * 2 <= wq_get_roof(wi)) {
wi->tm_end_of_protection = get_msec_time() +
WQ_PROTECTION_PERIOD;
return true;
}
size_t roof = 0;

return false;
if (wi->tc == WQ_FIXED)
return 0;

/* do not need to grow if there are enough threads */
if (wi->nr_threads >= uatomic_read(&wi->nr_queued_work))
return 0;

/* cannot grow if # threads already reaches maximum */
roof = (size_t)wq_get_roof(wi);
if (wi->nr_threads >= roof)
return 0;

wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD;
return min(wi->nr_threads * 2, roof);
}

/*
Expand All @@ -232,6 +255,9 @@ static bool wq_need_grow(struct wq_info *wi)
*/
static bool wq_need_shrink(struct wq_info *wi)
{
if (wi->tc == WQ_FIXED)
return false;

if (uatomic_read(&wi->nr_queued_work) < wi->nr_threads / 2)
/* we cannot shrink work queue during protection period. */
return wi->tm_end_of_protection <= get_msec_time();
Expand Down Expand Up @@ -262,13 +288,14 @@ static int create_worker_threads(struct wq_info *wi, size_t nr_threads)
void queue_work(struct work_queue *q, struct work *work)
{
struct wq_info *wi = container_of(q, struct wq_info, q);
size_t new_nr_threads = 0;

uatomic_inc(&wi->nr_queued_work);
sd_mutex_lock(&wi->pending_lock);

if (wq_need_grow(wi))
/* double the thread pool size */
create_worker_threads(wi, wi->nr_threads * 2);
new_nr_threads = wq_need_grow(wi);
if (new_nr_threads > 0)
create_worker_threads(wi, new_nr_threads);

list_add_tail(&work->w_list, &wi->q.pending_list);
sd_mutex_unlock(&wi->pending_lock);
Expand Down Expand Up @@ -374,17 +401,6 @@ int init_work_queue(size_t (*get_nr_nodes)(void))
return 0;
}

/*
* Allowing unlimited threads to be created is necessary to solve the following
* problems:
*
* 1. timeout of IO requests from guests. With on-demand short threads, we
* guarantee that there is always one thread available to execute the
* request as soon as possible.
* 2. sheep halt for corner case that all gateway and io threads are executing
* local requests that ask for creation of another thread to execute the
* requests and sleep-wait for responses.
*/
struct work_queue *create_work_queue(const char *name,
enum wq_thread_control tc)
{
Expand All @@ -403,9 +419,11 @@ struct work_queue *create_work_queue(const char *name,
sd_init_mutex(&wi->finished_lock);
sd_init_mutex(&wi->pending_lock);

ret = create_worker_threads(wi, 1);
if (ret < 0)
goto destroy_threads;
if (tc != WQ_FIXED) {
ret = create_worker_threads(wi, 1);
if (ret < 0)
goto destroy_threads;
}

list_add(&wi->list, &wq_info_list);

Expand All @@ -424,13 +442,37 @@ struct work_queue *create_ordered_work_queue(const char *name)
return create_work_queue(name, WQ_ORDERED);
}

struct work_queue *create_fixed_work_queue(const char *name, int nr_threads)
{
struct work_queue *wq;
struct wq_info *wi;
int ret;

wq = create_work_queue(name, WQ_FIXED);
if (!wq)
return NULL;

wi = container_of(wq, struct wq_info, q);
ret = create_worker_threads(wi, nr_threads);
if (ret) {
panic("failed to create a fixed workqueue: %s", name);
}

return wq;
}

bool work_queue_empty(struct work_queue *q)
{
struct wq_info *wi = container_of(q, struct wq_info, q);

return uatomic_read(&wi->nr_queued_work) == 0;
}

void set_max_dynamic_threads(size_t nr_max)
{
max_dynamic_threads = nr_max;
}

struct thread_args {
const char *name;
void *(*start_routine)(void *);
Expand Down Expand Up @@ -487,3 +529,12 @@ int sd_thread_join(sd_thread_t thread, void **retval)
return pthread_join(thread, retval);
}

static void __attribute__((constructor)) init_nr_cores(void)
{
const long nr = sysconf(_SC_NPROCESSORS_ONLN);
if (nr == -1L) {
fprintf(stderr, "cannot get the number of online processors\n");
exit(1);
}
nr_cores = (size_t)nr;
}

0 comments on commit 28a2b12

Please sign in to comment.