Skip to content

Commit

Permalink
Remove WQ_UNLIMITED and change the limit of threads for WQ_DYNAMIC
Browse files Browse the repository at this point in the history
This patch deletes WQ_UNLIMITED option because, with this option,
sheep may consume a very large amount of memory (in some cases,
to be killed by OOM-killer) by creating worker threads infinitely.

This patch also changes the upper limit for the number of threads
for WQ_DYNAMIC option from "#nodes*2" to "max(#nodes,#cores,16)*2".

Signed-off-by: Teruaki Ishizaki <ishizaki.teruaki@lab.ntt.co.jp>
Signed-off-by: Takashi Menjo <menjo.takashi@lab.ntt.co.jp>
(cherry picked from commit cbe755a)
  • Loading branch information
tmenjo committed Jan 23, 2017
1 parent 7d00020 commit 133c7cc
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 37 deletions.
5 changes: 1 addition & 4 deletions dog/benchmark.c
Expand Up @@ -99,17 +99,14 @@ static int benchmark_io(int argc, char **argv)
else if (!strcmp("dynamic",
benchmark_cmd_data.workqueue_type))
wq_type = WQ_DYNAMIC;
else if (!strcmp("unlimited",
benchmark_cmd_data.workqueue_type))
wq_type = WQ_UNLIMITED;
else if (!strcmp("fixed",
benchmark_cmd_data.workqueue_type))
wq_type = WQ_FIXED;
else {
sd_err("unknown workqueue type: %s",
benchmark_cmd_data.workqueue_type);
sd_err("assumed workqueue types:"
" ordered, dynamic, unlimited");
" ordered, dynamic");
return EXIT_SYSFAIL;
}
}
Expand Down
1 change: 0 additions & 1 deletion include/work.h
Expand Up @@ -25,7 +25,6 @@ struct work_queue {
enum wq_thread_control {
WQ_ORDERED, /* Only 1 thread created for work queue */
WQ_DYNAMIC, /* # of threads proportional to nr_nodes created */
WQ_UNLIMITED, /* Unlimited # of threads created */
WQ_FIXED, /* Fixed # of threads created */
};

Expand Down
29 changes: 13 additions & 16 deletions lib/work.c
Expand Up @@ -72,6 +72,7 @@ static int efd;
static LIST_HEAD(wq_info_list);
static size_t nr_nodes = 1;
static size_t (*wq_get_nr_nodes)(void);
static size_t nr_cores = 1;

static void *worker_routine(void *arg);

Expand Down Expand Up @@ -206,11 +207,9 @@ static inline uint64_t wq_get_roof(struct wq_info *wi)
case WQ_ORDERED:
break;
case WQ_DYNAMIC:
/* FIXME: 2 * nr_nodes threads. No rationale yet. */
nr = nr_nodes * 2;
break;
case WQ_UNLIMITED:
nr = SIZE_MAX;
/* max(#nodes,#cores,16)*2 threads */
nr = (uint64_t)max(nr_nodes, nr_cores);
nr = max(nr, UINT64_C(16)) * 2;
break;
case WQ_FIXED:
nr = wi->nr_threads;
Expand Down Expand Up @@ -393,17 +392,6 @@ int init_work_queue(size_t (*get_nr_nodes)(void))
return 0;
}

/*
* Allowing unlimited threads to be created is necessary to solve the following
* problems:
*
* 1. timeout of IO requests from guests. With on-demand short threads, we
* guarantee that there is always one thread available to execute the
* request as soon as possible.
* 2. sheep halt for corner case that all gateway and io threads are executing
* local requests that ask for creation of another thread to execute the
* requests and sleep-wait for responses.
*/
struct work_queue *create_work_queue(const char *name,
enum wq_thread_control tc)
{
Expand Down Expand Up @@ -528,3 +516,12 @@ int sd_thread_join(sd_thread_t thread, void **retval)
return pthread_join(thread, retval);
}

static void __attribute__((constructor)) init_nr_cores(void)
{
const long nr = sysconf(_SC_NPROCESSORS_ONLN);
if (nr == -1L) {
fprintf(stderr, "cannot get the number of online processors\n");
exit(1);
}
nr_cores = (size_t)nr;
}
20 changes: 10 additions & 10 deletions sheep/sheep.c
Expand Up @@ -500,29 +500,29 @@ static int create_work_queues(void)
sd_info("# of threads in net workqueue: %d", wq_net_threads);
sys->net_wqueue = create_fixed_work_queue("net", wq_net_threads);
} else {
sd_info("net workqueue is created as unlimited, it is not recommended!");
sys->net_wqueue = create_work_queue("net", WQ_UNLIMITED);
sd_info("net workqueue is created as dynamic");
sys->net_wqueue = create_work_queue("net", WQ_DYNAMIC);
}
if (wq_gway_threads) {
sd_info("# of threads in gway workqueue: %d", wq_gway_threads);
sys->gateway_wqueue = create_fixed_work_queue("gway", wq_gway_threads);
} else {
sd_info("gway workqueue is created as unlimited, it is not recommended!");
sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
sd_info("gway workqueue is created as dynamic");
sys->gateway_wqueue = create_work_queue("gway", WQ_DYNAMIC);
}
if (wq_io_threads) {
sd_info("# of threads in io workqueue: %d", wq_io_threads);
sys->io_wqueue = create_fixed_work_queue("io", wq_io_threads);
} else {
sd_info("io workqueue is created as unlimited, it is not recommended!");
sys->io_wqueue = create_work_queue("io", WQ_UNLIMITED);
sd_info("io workqueue is created as dynamic");
sys->io_wqueue = create_work_queue("io", WQ_DYNAMIC);
}
if (wq_recovery_threads) {
sd_info("# of threads in rw workqueue: %d", wq_recovery_threads);
sys->recovery_wqueue = create_fixed_work_queue("rw", wq_recovery_threads);
} else {
sd_info("recovery workqueue is created as unlimited, it is not recommended!");
sys->recovery_wqueue = create_work_queue("rw", WQ_UNLIMITED);
sd_info("recovery workqueue is created as dynamic");
sys->recovery_wqueue = create_work_queue("rw", WQ_DYNAMIC);
}
sys->deletion_wqueue = create_ordered_work_queue("deletion");
sys->block_wqueue = create_ordered_work_queue("block");
Expand All @@ -531,8 +531,8 @@ static int create_work_queues(void)
sd_info("# of threads in async_req workqueue: %d", wq_async_threads);
sys->areq_wqueue = create_fixed_work_queue("async_req", wq_async_threads);
} else {
sd_info("async_req workqueue is created as unlimited, it is not recommended!");
sys->areq_wqueue = create_work_queue("async_req", WQ_UNLIMITED);
sd_info("async_req workqueue is created as dynamic");
sys->areq_wqueue = create_work_queue("async_req", WQ_DYNAMIC);
}
if (!sys->gateway_wqueue || !sys->io_wqueue || !sys->recovery_wqueue ||
!sys->deletion_wqueue || !sys->block_wqueue || !sys->md_wqueue ||
Expand Down
5 changes: 0 additions & 5 deletions tests/unit/lib/test_work.c
Expand Up @@ -21,7 +21,6 @@ static void test_create_work_queue(void)
{
const char *name_o = "wq_ordered";
const char *name_d = "wq_dynamic";
const char *name_u = "wq_unlimited";
enum wq_thread_control tc = WQ_ORDERED;

wq = create_work_queue(name_o, tc);
Expand All @@ -30,10 +29,6 @@ static void test_create_work_queue(void)
tc = WQ_DYNAMIC;
wq = create_work_queue(name_d, tc);
TEST_ASSERT_NOT_NULL(wq);

tc = WQ_UNLIMITED;
wq = create_work_queue(name_u, tc);
TEST_ASSERT_NOT_NULL(wq);
}

static void test_queue_work(void)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/sheep/test_recovery.c
Expand Up @@ -52,7 +52,7 @@ static void test_start_recovery()
/* create work queue */
init_event(EPOLL_SIZE);
init_work_queue(NULL);
sys->recovery_wqueue = create_work_queue("rw", WQ_UNLIMITED);
sys->recovery_wqueue = create_work_queue("rw", WQ_DYNAMIC);

INIT_LIST_HEAD(&sys->req_wait_queue);

Expand Down

0 comments on commit 133c7cc

Please sign in to comment.