Skip to content

Commit

Permalink
io: add qio_task_wait_thread to join with a background thread
Browse files Browse the repository at this point in the history
Add the ability for a caller to wait for completion of the
background thread to synchronously dispatch its result, without
needing to wait for the main loop to run the idle callback.

This method needs very careful usage to avoid a dangerous
race condition with the free'ing of the task. The completion
callback is normally invoked from an idle callback registered
with the main loop context. The qio_task_wait_thread method
must only be called if the completion callback has not yet
run. The only safe way to achieve this is to run the
qio_task_wait_thread method from the thread that executes
the main loop.

It is generally a bad idea to use this method since it will
block execution of the main loop, however, the design of
the character devices and its usage from vhostuser already
requires blocking execution.

Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
Message-Id: <20190211182442.8542-3-berrange@redhat.com>
Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
  • Loading branch information
berrange authored and elmarco committed Feb 12, 2019
1 parent 52d6cfe commit dbb4450
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 5 deletions.
29 changes: 28 additions & 1 deletion include/io/task.h
Expand Up @@ -232,14 +232,41 @@ QIOTask *qio_task_new(Object *source,
*
* Run a task in a background thread. When @worker
* returns it will call qio_task_complete() in
* the event thread context that provided.
* the thread that is running the main loop associated
* with @context.
*/
void qio_task_run_in_thread(QIOTask *task,
QIOTaskWorker worker,
gpointer opaque,
GDestroyNotify destroy,
GMainContext *context);


/**
* qio_task_wait_thread:
* @task: the task struct
*
* Wait for completion of a task that was previously
* invoked using qio_task_run_in_thread. This MUST
* ONLY be invoked if the task has not already
* completed, since after the completion callback
* is invoked, @task will have been freed.
*
* To avoid racing with execution of the completion
* callback provided with qio_task_new, this method
* MUST ONLY be invoked from the thread that is
* running the main loop associated with @context
* parameter to qio_task_run_in_thread.
*
* When the thread has completed, the completion
* callback provided to qio_task_new will be invoked.
* When that callback returns @task will be freed,
* so @task must not be referenced after this
* method completes.
*/
void qio_task_wait_thread(QIOTask *task);


/**
* qio_task_complete:
* @task: the task struct
Expand Down
41 changes: 37 additions & 4 deletions io/task.c
Expand Up @@ -29,6 +29,7 @@ struct QIOTaskThreadData {
gpointer opaque;
GDestroyNotify destroy;
GMainContext *context;
GSource *completion;
};


Expand All @@ -40,6 +41,8 @@ struct QIOTask {
Error *err;
gpointer result;
GDestroyNotify destroyResult;
QemuMutex thread_lock;
QemuCond thread_cond;
struct QIOTaskThreadData *thread;
};

Expand All @@ -58,6 +61,8 @@ QIOTask *qio_task_new(Object *source,
task->func = func;
task->opaque = opaque;
task->destroy = destroy;
qemu_mutex_init(&task->thread_lock);
qemu_cond_init(&task->thread_cond);

trace_qio_task_new(task, source, func, opaque);

Expand All @@ -66,6 +71,7 @@ QIOTask *qio_task_new(Object *source,

static void qio_task_free(QIOTask *task)
{
qemu_mutex_lock(&task->thread_lock);
if (task->thread) {
if (task->thread->destroy) {
task->thread->destroy(task->thread->opaque);
Expand All @@ -89,6 +95,10 @@ static void qio_task_free(QIOTask *task)
}
object_unref(task->source);

qemu_mutex_unlock(&task->thread_lock);
qemu_mutex_destroy(&task->thread_lock);
qemu_cond_destroy(&task->thread_cond);

g_free(task);
}

Expand All @@ -107,7 +117,6 @@ static gboolean qio_task_thread_result(gpointer opaque)
static gpointer qio_task_thread_worker(gpointer opaque)
{
QIOTask *task = opaque;
GSource *idle;

trace_qio_task_thread_run(task);

Expand All @@ -120,9 +129,17 @@ static gpointer qio_task_thread_worker(gpointer opaque)
*/
trace_qio_task_thread_exit(task);

idle = g_idle_source_new();
g_source_set_callback(idle, qio_task_thread_result, task, NULL);
g_source_attach(idle, task->thread->context);
qemu_mutex_lock(&task->thread_lock);

task->thread->completion = g_idle_source_new();
g_source_set_callback(task->thread->completion,
qio_task_thread_result, task, NULL);
g_source_attach(task->thread->completion,
task->thread->context);
trace_qio_task_thread_source_attach(task, task->thread->completion);

qemu_cond_signal(&task->thread_cond);
qemu_mutex_unlock(&task->thread_lock);

return NULL;
}
Expand Down Expand Up @@ -157,6 +174,22 @@ void qio_task_run_in_thread(QIOTask *task,
}


void qio_task_wait_thread(QIOTask *task)
{
qemu_mutex_lock(&task->thread_lock);
g_assert(task->thread != NULL);
while (task->thread->completion == NULL) {
qemu_cond_wait(&task->thread_cond, &task->thread_lock);
}

trace_qio_task_thread_source_cancel(task, task->thread->completion);
g_source_destroy(task->thread->completion);
qemu_mutex_unlock(&task->thread_lock);

qio_task_thread_result(task);
}


void qio_task_complete(QIOTask *task)
{
task->func(task, task->opaque);
Expand Down
2 changes: 2 additions & 0 deletions io/trace-events
Expand Up @@ -7,6 +7,8 @@ qio_task_thread_start(void *task, void *worker, void *opaque) "Task thread start
qio_task_thread_run(void *task) "Task thread run task=%p"
qio_task_thread_exit(void *task) "Task thread exit task=%p"
qio_task_thread_result(void *task) "Task thread result task=%p"
qio_task_thread_source_attach(void *task, void *source) "Task thread source attach task=%p source=%p"
qio_task_thread_source_cancel(void *task, void *source) "Task thread source cancel task=%p source=%p"

# io/channel-socket.c
qio_channel_socket_new(void *ioc) "Socket new ioc=%p"
Expand Down

0 comments on commit dbb4450

Please sign in to comment.