Skip to content

Commit

Permalink
Remove WQ_UNLIMITED and change the limit of threads for WQ_DYNAMIC
Browse files Browse the repository at this point in the history
This patch deletes WQ_UNLIMITED option because, with this option,
sheep may consume a very large amount of memory (in some cases,
to be killed by OOM-killer) by creating worker threads infinitely.

This patch also changes the upper limit for the number of threads
for WQ_DYNAMIC option from "#nodes*2" to "max(#nodes,#cores,16)*2".

Signed-off-by: Teruaki Ishizaki <ishizaki.teruaki@lab.ntt.co.jp>
Signed-off-by: Takashi Menjo <menjo.takashi@lab.ntt.co.jp>
(cherry picked from commit cbe755a)

Conflicts:
	dog/benchmark.c
	tests/unit/lib/test_work.c
	tests/unit/sheep/test_recovery.c
  • Loading branch information
tmenjo committed Jan 24, 2017
1 parent d7d692e commit d464634
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 27 deletions.
1 change: 0 additions & 1 deletion include/work.h
Expand Up @@ -24,7 +24,6 @@ struct work_queue {
enum wq_thread_control {
WQ_ORDERED, /* Only 1 thread created for work queue */
WQ_DYNAMIC, /* # of threads proportional to nr_nodes created */
WQ_UNLIMITED, /* Unlimited # of threads created */
WQ_FIXED, /* Fixed # of threads created */
};

Expand Down
29 changes: 13 additions & 16 deletions lib/work.c
Expand Up @@ -68,6 +68,7 @@ static int efd;
static LIST_HEAD(wq_info_list);
static size_t nr_nodes = 1;
static size_t (*wq_get_nr_nodes)(void);
static size_t nr_cores = 1;

static void *worker_routine(void *arg);

Expand Down Expand Up @@ -202,11 +203,9 @@ static inline uint64_t wq_get_roof(struct wq_info *wi)
case WQ_ORDERED:
break;
case WQ_DYNAMIC:
/* FIXME: 2 * nr_nodes threads. No rationale yet. */
nr = nr_nodes * 2;
break;
case WQ_UNLIMITED:
nr = SIZE_MAX;
/* max(#nodes,#cores,16)*2 threads */
nr = (uint64_t)max(nr_nodes, nr_cores);
nr = max(nr, UINT64_C(16)) * 2;
break;
case WQ_FIXED:
nr = wi->nr_threads;
Expand Down Expand Up @@ -383,17 +382,6 @@ int init_work_queue(size_t (*get_nr_nodes)(void))
return 0;
}

/*
* Allowing unlimited threads to be created is necessary to solve the following
* problems:
*
* 1. timeout of IO requests from guests. With on-demand short threads, we
* guarantee that there is always one thread available to execute the
* request as soon as possible.
* 2. sheep halt for corner case that all gateway and io threads are executing
* local requests that ask for creation of another thread to execute the
* requests and sleep-wait for responses.
*/
struct work_queue *create_work_queue(const char *name,
enum wq_thread_control tc)
{
Expand Down Expand Up @@ -517,3 +505,12 @@ int sd_thread_join(sd_thread_t thread, void **retval)
return pthread_join(thread, retval);
}

static void __attribute__((constructor)) init_nr_cores(void)
{
const long nr = sysconf(_SC_NPROCESSORS_ONLN);
if (nr == -1L) {
fprintf(stderr, "cannot get the number of online processors\n");
exit(1);
}
nr_cores = (size_t)nr;
}
20 changes: 10 additions & 10 deletions sheep/sheep.c
Expand Up @@ -524,29 +524,29 @@ static int create_work_queues(void)
sd_info("# of threads in net workqueue: %d", wq_net_threads);
sys->net_wqueue = create_fixed_work_queue("net", wq_net_threads);
} else {
sd_info("net workqueue is created as unlimited, it is not recommended!");
sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
sd_info("net workqueue is created as dynamic");
sys->net_wqueue = create_work_queue("net", WQ_DYNAMIC);
}
if (wq_gway_threads) {
sd_info("# of threads in gway workqueue: %d", wq_gway_threads);
sys->gateway_wqueue = create_fixed_work_queue("gway", wq_gway_threads);
} else {
sd_info("gway workqueue is created as unlimited, it is not recommended!");
sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
sd_info("gway workqueue is created as dynamic");
sys->gateway_wqueue = create_work_queue("gway", WQ_DYNAMIC);
}
if (wq_io_threads) {
sd_info("# of threads in io workqueue: %d", wq_io_threads);
sys->io_wqueue = create_fixed_work_queue("io", wq_io_threads);
} else {
sd_info("io workqueue is created as unlimited, it is not recommended!");
sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED);
sd_info("io workqueue is created as dynamic");
sys->io_wqueue = create_work_queue("io", WQ_DYNAMIC);
}
if (wq_recovery_threads) {
sd_info("# of threads in rw workqueue: %d", wq_recovery_threads);
sys->recovery_wqueue = create_fixed_work_queue("rw", wq_recovery_threads);
} else {
sd_info("recovery workqueue is created as unlimited, it is not recommended!");
sys->recovery_wqueue = create_work_queue("rw", WQ_UNLIMITED);
sd_info("recovery workqueue is created as dynamic");
sys->recovery_wqueue = create_work_queue("rw", WQ_DYNAMIC);
}
sys->deletion_wqueue = create_ordered_work_queue("deletion");
sys->block_wqueue = create_ordered_work_queue("block");
Expand All @@ -555,8 +555,8 @@ static int create_work_queues(void)
sd_info("# of threads in async_req workqueue: %d", wq_async_threads);
sys->areq_wqueue = create_fixed_work_queue("async_req", wq_async_threads);
} else {
sd_info("async_req workqueue is created as unlimited, it is not recommended!");
sys->areq_wqueue = create_work_queue("async_req", WQ_UNLIMITED);
sd_info("async_req workqueue is created as dynamic");
sys->areq_wqueue = create_work_queue("async_req", WQ_DYNAMIC);
}
if (sys->enable_object_cache) {
sys->oc_reclaim_wqueue =
Expand Down

0 comments on commit d464634

Please sign in to comment.