Skip to content

Commit

Permalink
threadpool: add thread_pool_new() and thread_pool_free()
Browse files Browse the repository at this point in the history
ThreadPool is tied to an AioContext through its event notifier, which
dictates in which AioContext the work item's callback function will be
invoked.

In order to support multiple AioContexts we need to support multiple
ThreadPool instances.

This patch adds the new/free functions.  The free function deserves
special attention because it quiesces remaining worker threads.  This
requires a new condition variable and a "stopping" flag to let workers
know they should terminate once idle.

We never needed to do this before since the global threadpool was not
explicitly destroyed until process termination.

Also stash the AioContext pointer in ThreadPool so that we can call
aio_set_event_notifier() in thread_pool_free().  We didn't need to hold
onto AioContext previously since there was no free function.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
  • Loading branch information
stefanhaRH committed Mar 15, 2013
1 parent b811203 commit f7311cc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
5 changes: 5 additions & 0 deletions include/block/thread-pool.h
Expand Up @@ -26,6 +26,11 @@

typedef int ThreadPoolFunc(void *opaque);

typedef struct ThreadPool ThreadPool;

ThreadPool *thread_pool_new(struct AioContext *ctx);
void thread_pool_free(ThreadPool *pool);

BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
BlockDriverCompletionFunc *cb, void *opaque);
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
Expand Down
52 changes: 48 additions & 4 deletions thread-pool.c
Expand Up @@ -24,8 +24,6 @@
#include "qemu/event_notifier.h"
#include "block/thread-pool.h"

typedef struct ThreadPool ThreadPool;

static void do_spawn_thread(ThreadPool *pool);

typedef struct ThreadPoolElement ThreadPoolElement;
Expand Down Expand Up @@ -59,8 +57,10 @@ struct ThreadPoolElement {

struct ThreadPool {
EventNotifier notifier;
AioContext *ctx;
QemuMutex lock;
QemuCond check_cancel;
QemuCond worker_stopped;
QemuSemaphore sem;
int max_threads;
QEMUBH *new_thread_bh;
Expand All @@ -75,6 +75,7 @@ struct ThreadPool {
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
int pending_cancellations; /* whether we need a cond_broadcast */
bool stopping;
};

/* Currently there is only one thread pool instance. */
Expand All @@ -88,7 +89,7 @@ static void *worker_thread(void *opaque)
pool->pending_threads--;
do_spawn_thread(pool);

while (1) {
while (!pool->stopping) {
ThreadPoolElement *req;
int ret;

Expand All @@ -99,7 +100,7 @@ static void *worker_thread(void *opaque)
qemu_mutex_lock(&pool->lock);
pool->idle_threads--;
} while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
if (ret == -1) {
if (ret == -1 || pool->stopping) {
break;
}

Expand All @@ -124,6 +125,7 @@ static void *worker_thread(void *opaque)
}

pool->cur_threads--;
qemu_cond_signal(&pool->worker_stopped);
qemu_mutex_unlock(&pool->lock);
return NULL;
}
Expand Down Expand Up @@ -298,8 +300,10 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)

memset(pool, 0, sizeof(*pool));
event_notifier_init(&pool->notifier, false);
pool->ctx = ctx;
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->check_cancel);
qemu_cond_init(&pool->worker_stopped);
qemu_sem_init(&pool->sem, 0);
pool->max_threads = 64;
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
Expand All @@ -311,6 +315,46 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
thread_pool_active);
}

ThreadPool *thread_pool_new(AioContext *ctx)
{
ThreadPool *pool = g_new(ThreadPool, 1);
thread_pool_init_one(pool, ctx);
return pool;
}

void thread_pool_free(ThreadPool *pool)
{
if (!pool) {
return;
}

assert(QLIST_EMPTY(&pool->head));

qemu_mutex_lock(&pool->lock);

/* Stop new threads from spawning */
qemu_bh_delete(pool->new_thread_bh);
pool->cur_threads -= pool->new_threads;
pool->new_threads = 0;

/* Wait for worker threads to terminate */
pool->stopping = true;
while (pool->cur_threads > 0) {
qemu_sem_post(&pool->sem);
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
}

qemu_mutex_unlock(&pool->lock);

aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
qemu_sem_destroy(&pool->sem);
qemu_cond_destroy(&pool->check_cancel);
qemu_cond_destroy(&pool->worker_stopped);
qemu_mutex_destroy(&pool->lock);
event_notifier_cleanup(&pool->notifier);
g_free(pool);
}

static void thread_pool_init(void)
{
thread_pool_init_one(&global_pool, NULL);
Expand Down

0 comments on commit f7311cc

Please sign in to comment.