Skip to content

Commit

Permalink
Merge pull request #351 from tmenjo/fix-max-dynamic-threads
Browse files Browse the repository at this point in the history
sheep: add a new option for setting threads for dynamic workqueues
  • Loading branch information
mitake committed Jan 23, 2017
2 parents 729e66a + c42b9d5 commit 810e4c6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 15 deletions.
1 change: 1 addition & 0 deletions include/work.h
Expand Up @@ -66,6 +66,7 @@ struct work_queue *create_fixed_work_queue(const char *name, int nr_threads);
void queue_work(struct work_queue *q, struct work *work);
bool work_queue_empty(struct work_queue *q);
int wq_trace_init(void);
void set_max_dynamic_threads(size_t nr_max);

#ifdef HAVE_TRACE
void suspend_worker_threads(void);
Expand Down
54 changes: 39 additions & 15 deletions lib/work.c
Expand Up @@ -37,6 +37,9 @@
#define TRACEPOINT_DEFINE
#include "work_tp.h"

/* If this is greater than 0, the number of threads grows based on it. */
static size_t max_dynamic_threads = 0;

/*
* The protection period from shrinking work queue. This is necessary
* to avoid many calls of pthread_create. Without it, threads are
Expand Down Expand Up @@ -207,9 +210,13 @@ static inline uint64_t wq_get_roof(struct wq_info *wi)
case WQ_ORDERED:
break;
case WQ_DYNAMIC:
/* max(#nodes,#cores,16)*2 threads */
nr = (uint64_t)max(nr_nodes, nr_cores);
nr = max(nr, UINT64_C(16)) * 2;
if (max_dynamic_threads > 0) {
nr = (uint64_t)max_dynamic_threads;
} else {
/* max(#nodes,#cores,16)*2 threads */
nr = (uint64_t)max(nr_nodes, nr_cores);
nr = max(nr, UINT64_C(16)) * 2;
}
break;
case WQ_FIXED:
nr = wi->nr_threads;
Expand All @@ -220,19 +227,30 @@ static inline uint64_t wq_get_roof(struct wq_info *wi)
return nr;
}

static bool wq_need_grow(struct wq_info *wi)
/*
* Return non-zero if a given workqueue need to grow.
* The return value is the new number of threads.
*
* Otherwise, return zero.
*/
static size_t wq_need_grow(struct wq_info *wi)
{
size_t roof = 0;

if (wi->tc == WQ_FIXED)
return false;
return 0;

if (wi->nr_threads < uatomic_read(&wi->nr_queued_work) &&
wi->nr_threads * 2 <= wq_get_roof(wi)) {
wi->tm_end_of_protection = get_msec_time() +
WQ_PROTECTION_PERIOD;
return true;
}
/* do not need to grow if there are enough threads */
if (wi->nr_threads >= uatomic_read(&wi->nr_queued_work))
return 0;

return false;
/* cannot grow if # threads already reaches maximum */
roof = (size_t)wq_get_roof(wi);
if (wi->nr_threads >= roof)
return 0;

wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD;
return min(wi->nr_threads * 2, roof);
}

/*
Expand Down Expand Up @@ -274,15 +292,16 @@ static int create_worker_threads(struct wq_info *wi, size_t nr_threads)
void queue_work(struct work_queue *q, struct work *work)
{
struct wq_info *wi = container_of(q, struct wq_info, q);
size_t new_nr_threads = 0;

tracepoint(work, queue_work, wi, work);

uatomic_inc(&wi->nr_queued_work);
sd_mutex_lock(&wi->pending_lock);

if (wq_need_grow(wi))
/* double the thread pool size */
create_worker_threads(wi, wi->nr_threads * 2);
new_nr_threads = wq_need_grow(wi);
if (new_nr_threads > 0)
create_worker_threads(wi, new_nr_threads);

list_add_tail(&work->w_list, &wi->q.pending_list);
sd_mutex_unlock(&wi->pending_lock);
Expand Down Expand Up @@ -460,6 +479,11 @@ bool work_queue_empty(struct work_queue *q)
return uatomic_read(&wi->nr_queued_work) == 0;
}

void set_max_dynamic_threads(size_t nr_max)
{
max_dynamic_threads = nr_max;
}

struct thread_args {
const char *name;
void *(*start_routine)(void *);
Expand Down
13 changes: 13 additions & 0 deletions sheep/sheep.c
Expand Up @@ -155,6 +155,8 @@ static struct sd_option sheep_options[] = {
{'V', "vnodes", true, "set number of vnodes", vnodes_help},
{'w', "wq-threads", true, "specify a number of threads for workqueue"},
{'W', "wildcard-recovery", false, "wildcard recovery for first time"},
{'x', "max-dynamic-threads", true,
"specify the maximum number of threads for dynamic workqueue"},
{'y', "myaddr", true, "specify the address advertised to other sheep",
myaddr_help},
{'z', "zone", true,
Expand Down Expand Up @@ -703,6 +705,7 @@ int main(int argc, char **argv)
bool daemonize = true;
int32_t nr_vnodes = -1;
int64_t zone = -1;
uint32_t max_dynamic_threads = 0;
struct cluster_driver *cdrv;
struct option *long_options;
#ifdef HAVE_HTTP
Expand Down Expand Up @@ -879,6 +882,16 @@ int main(int argc, char **argv)
if (option_parse(optarg, ",", wq_parsers) < 0)
exit(1);
break;
case 'x':
max_dynamic_threads = str_to_u32(optarg);
if (errno != 0 || max_dynamic_threads < 1) {
sd_err("Invalid number of threads '%s': "
"must be an integer between 1 and %"PRIu32,
optarg, UINT32_MAX);
exit(1);
}
set_max_dynamic_threads((size_t)max_dynamic_threads);
break;
default:
usage(1);
break;
Expand Down

0 comments on commit 810e4c6

Please sign in to comment.