Skip to content

Commit

Permalink
AioContext: fix broken ctx->dispatching optimization
Browse files Browse the repository at this point in the history
manually backport from:

commit eabc977
Author: Paolo Bonzini <pbonzini@redhat.com>
Date:   Tue Jul 21 16:07:51 2015 +0200

Signed-off-by: Peter Lieven <pl@kamp.de>
  • Loading branch information
plieven committed Sep 3, 2015
1 parent 62f0964 commit 0ddcdc6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 28 deletions.
19 changes: 9 additions & 10 deletions aio-posix.c
Expand Up @@ -189,24 +189,21 @@ bool aio_dispatch(AioContext *ctx)
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
bool was_dispatching;
int ret;
bool progress;

was_dispatching = ctx->dispatching;
progress = false;

/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
* already true when aio_poll is called with blocking == false;
* if blocking == true, it is only true after poll() returns.
*
* If we're in a nested event loop, ctx->dispatching might be true.
* In that case we can restore it just before returning, but we
* have to clear it now.
* if blocking == true, it is only true after poll() returns,
* so disable the optimization now.
*/
aio_set_dispatching(ctx, !blocking);
if (blocking) {
atomic_add(&ctx->notify_me, 2);
}

ctx->walking_handlers++;

Expand All @@ -232,6 +229,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->pollfds->len,
blocking ? aio_compute_timeout(ctx) : 0);

if (blocking) {
atomic_sub(&ctx->notify_me, 2);
}

/* if we have any readable fds, dispatch event */
if (ret > 0) {
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
Expand All @@ -244,11 +245,9 @@ bool aio_poll(AioContext *ctx, bool blocking)
}

/* Run dispatch even if there were no readable fds to run timers */
aio_set_dispatching(ctx, true);
if (aio_dispatch(ctx)) {
progress = true;
}

aio_set_dispatching(ctx, was_dispatching);
return progress;
}
23 changes: 8 additions & 15 deletions async.c
Expand Up @@ -182,6 +182,8 @@ aio_ctx_prepare(GSource *source, gint *timeout)
{
AioContext *ctx = (AioContext *) source;

atomic_or(&ctx->notify_me, 1);

/* We assume there is no timeout already supplied */
*timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));

Expand All @@ -198,10 +200,11 @@ aio_ctx_check(GSource *source)
AioContext *ctx = (AioContext *) source;
QEMUBH *bh;

atomic_and(&ctx->notify_me, ~1);
for (bh = ctx->first_bh; bh; bh = bh->next) {
if (!bh->deleted && bh->scheduled) {
return true;
}
}
}
return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
}
Expand Down Expand Up @@ -253,23 +256,13 @@ ThreadPool *aio_get_thread_pool(AioContext *ctx)
return ctx->thread_pool;
}

void aio_set_dispatching(AioContext *ctx, bool dispatching)
{
ctx->dispatching = dispatching;
if (!dispatching) {
/* Write ctx->dispatching before reading e.g. bh->scheduled.
* Optimization: this is only needed when we're entering the "unsafe"
* phase where other threads must call event_notifier_set.
*/
smp_mb();
}
}

void aio_notify(AioContext *ctx)
{
/* Write e.g. bh->scheduled before reading ctx->dispatching. */
/* Write e.g. bh->scheduled before reading ctx->notify_me. Pairs
* with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
*/
smp_mb();
if (!ctx->dispatching) {
if (ctx->notify_me) {
event_notifier_set(&ctx->notifier);
}
}
Expand Down
26 changes: 23 additions & 3 deletions include/block/aio.h
Expand Up @@ -63,10 +63,30 @@ struct AioContext {
*/
int walking_handlers;

/* Used to avoid unnecessary event_notifier_set calls in aio_notify.
* Writes protected by lock or BQL, reads are lockless.
/* Used to avoid unnecessary event_notifier_set calls in aio_notify;
* accessed with atomic primitives. If this field is 0, everything
* (file descriptors, bottom halves, timers) will be re-evaluated
* before the next blocking poll(), thus the event_notifier_set call
* can be skipped. If it is non-zero, you may need to wake up a
* concurrent aio_poll or the glib main event loop, making
* event_notifier_set necessary.
*
* Bit 0 is reserved for GSource usage of the AioContext, and is 1
* between a call to aio_ctx_check and the next call to aio_ctx_dispatch.
* Bits 1-31 simply count the number of active calls to aio_poll
* that are in the prepare or poll phase.
*
* The GSource and aio_poll must use a different mechanism because
* there is no certainty that a call to GSource's prepare callback
* (via g_main_context_prepare) is indeed followed by check and
* dispatch. It's not clear whether this would be a bug, but let's
* play safe and allow it---it will just cause extra calls to
* event_notifier_set until the next call to dispatch.
*
* Instead, the aio_poll calls include both the prepare and the
* dispatch phase, hence a simple counter is enough for them.
*/
bool dispatching;
uint32_t notify_me;

/* lock to protect between bh's adders and deleter */
QemuMutex bh_lock;
Expand Down

0 comments on commit 0ddcdc6

Please sign in to comment.