Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-reques…
Browse files Browse the repository at this point in the history
…t' into staging

Pull request

Fixes for QEMU on aarch64 ARM hosts and fdmon-io_uring.

# gpg: Signature made Thu 09 Apr 2020 18:42:01 BST
# gpg:                using RSA key 8695A8BFD3F97CDAAC35775A9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>" [full]
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>" [full]
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request:
  async: use explicit memory barriers
  aio-wait: delegate polling of main AioContext if BQL not held
  aio-posix: signal-proof fdmon-io_uring

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
  • Loading branch information
pm215 committed Apr 9, 2020
2 parents 8bac3ba + 5710a3e commit 17e1e49
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 30 deletions.
22 changes: 22 additions & 0 deletions include/block/aio-wait.h
Expand Up @@ -26,6 +26,7 @@
#define QEMU_AIO_WAIT_H

#include "block/aio.h"
#include "qemu/main-loop.h"

/**
* AioWait:
Expand Down Expand Up @@ -124,4 +125,25 @@ void aio_wait_kick(void);
*/
void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque);

/**
* in_aio_context_home_thread:
* @ctx: the aio context
*
* Return whether we are running in the thread that normally runs @ctx. Note
* that acquiring/releasing ctx does not affect the outcome, each AioContext
* still only has one home thread that is responsible for running it.
*/
static inline bool in_aio_context_home_thread(AioContext *ctx)
{
if (ctx == qemu_get_current_aio_context()) {
return true;
}

if (ctx == qemu_get_aio_context()) {
return qemu_mutex_iothread_locked();
} else {
return false;
}
}

#endif /* QEMU_AIO_WAIT_H */
29 changes: 10 additions & 19 deletions include/block/aio.h
Expand Up @@ -133,12 +133,16 @@ struct AioContext {
AioHandlerList deleted_aio_handlers;

/* Used to avoid unnecessary event_notifier_set calls in aio_notify;
* accessed with atomic primitives. If this field is 0, everything
* (file descriptors, bottom halves, timers) will be re-evaluated
* before the next blocking poll(), thus the event_notifier_set call
* can be skipped. If it is non-zero, you may need to wake up a
* concurrent aio_poll or the glib main event loop, making
* event_notifier_set necessary.
* only written from the AioContext home thread, or under the BQL in
* the case of the main AioContext. However, it is read from any
* thread so it is still accessed with atomic primitives.
*
* If this field is 0, everything (file descriptors, bottom halves,
* timers) will be re-evaluated before the next blocking poll() or
* io_uring wait; therefore, the event_notifier_set call can be
* skipped. If it is non-zero, you may need to wake up a concurrent
* aio_poll or the glib main event loop, making event_notifier_set
* necessary.
*
* Bit 0 is reserved for GSource usage of the AioContext, and is 1
* between a call to aio_ctx_prepare and the next call to aio_ctx_check.
Expand Down Expand Up @@ -681,19 +685,6 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
*/
AioContext *qemu_get_current_aio_context(void);

/**
* in_aio_context_home_thread:
* @ctx: the aio context
*
* Return whether we are running in the thread that normally runs @ctx. Note
* that acquiring/releasing ctx does not affect the outcome, each AioContext
* still only has one home thread that is responsible for running it.
*/
static inline bool in_aio_context_home_thread(AioContext *ctx)
{
return ctx == qemu_get_current_aio_context();
}

/**
* aio_context_setup:
* @ctx: the aio context
Expand Down
16 changes: 14 additions & 2 deletions util/aio-posix.c
Expand Up @@ -559,6 +559,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
int64_t timeout;
int64_t start = 0;

/*
* There cannot be two concurrent aio_poll calls for the same AioContext (or
* an aio_poll concurrent with a GSource prepare/check/dispatch callback).
* We rely on this below to avoid slow locked accesses to ctx->notify_me.
*/
assert(in_aio_context_home_thread(ctx));

/* aio_notify can avoid the expensive event_notifier_set if
Expand All @@ -569,7 +574,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
* so disable the optimization now.
*/
if (blocking) {
atomic_add(&ctx->notify_me, 2);
atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
/*
* Write ctx->notify_me before computing the timeout
* (reading bottom half flags, etc.). Pairs with
* smp_mb in aio_notify().
*/
smp_mb();
}

qemu_lockcnt_inc(&ctx->list_lock);
Expand All @@ -590,7 +601,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
}

if (blocking) {
atomic_sub(&ctx->notify_me, 2);
/* Finish the poll before clearing the flag. */
atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
aio_notify_accept(ctx);
}

Expand Down
17 changes: 14 additions & 3 deletions util/aio-win32.c
Expand Up @@ -321,6 +321,12 @@ bool aio_poll(AioContext *ctx, bool blocking)
int count;
int timeout;

/*
* There cannot be two concurrent aio_poll calls for the same AioContext (or
* an aio_poll concurrent with a GSource prepare/check/dispatch callback).
* We rely on this below to avoid slow locked accesses to ctx->notify_me.
*/
assert(in_aio_context_home_thread(ctx));
progress = false;

/* aio_notify can avoid the expensive event_notifier_set if
Expand All @@ -331,7 +337,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
* so disable the optimization now.
*/
if (blocking) {
atomic_add(&ctx->notify_me, 2);
atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2);
/*
* Write ctx->notify_me before computing the timeout
* (reading bottom half flags, etc.). Pairs with
* smp_mb in aio_notify().
*/
smp_mb();
}

qemu_lockcnt_inc(&ctx->list_lock);
Expand Down Expand Up @@ -364,8 +376,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
if (blocking) {
assert(first);
assert(in_aio_context_home_thread(ctx));
atomic_sub(&ctx->notify_me, 2);
atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2);
aio_notify_accept(ctx);
}

Expand Down
16 changes: 12 additions & 4 deletions util/async.c
Expand Up @@ -249,7 +249,14 @@ aio_ctx_prepare(GSource *source, gint *timeout)
{
AioContext *ctx = (AioContext *) source;

atomic_or(&ctx->notify_me, 1);
atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) | 1);

/*
* Write ctx->notify_me before computing the timeout
* (reading bottom half flags, etc.). Pairs with
* smp_mb in aio_notify().
*/
smp_mb();

/* We assume there is no timeout already supplied */
*timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
Expand All @@ -268,7 +275,8 @@ aio_ctx_check(GSource *source)
QEMUBH *bh;
BHListSlice *s;

atomic_and(&ctx->notify_me, ~1);
/* Finish computing the timeout before clearing the flag. */
atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) & ~1);
aio_notify_accept(ctx);

QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) {
Expand Down Expand Up @@ -411,10 +419,10 @@ LuringState *aio_get_linux_io_uring(AioContext *ctx)
void aio_notify(AioContext *ctx)
{
/* Write e.g. bh->scheduled before reading ctx->notify_me. Pairs
* with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
* with smp_mb in aio_ctx_prepare or aio_poll.
*/
smp_mb();
if (ctx->notify_me) {
if (atomic_read(&ctx->notify_me)) {
event_notifier_set(&ctx->notifier);
atomic_mb_set(&ctx->notified, true);
}
Expand Down
10 changes: 8 additions & 2 deletions util/fdmon-io_uring.c
Expand Up @@ -88,7 +88,10 @@ static struct io_uring_sqe *get_sqe(AioContext *ctx)
}

/* No free sqes left, submit pending sqes first */
ret = io_uring_submit(ring);
do {
ret = io_uring_submit(ring);
} while (ret == -EINTR);

assert(ret > 1);
sqe = io_uring_get_sqe(ring);
assert(sqe);
Expand Down Expand Up @@ -282,7 +285,10 @@ static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,

fill_sq_ring(ctx);

ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
do {
ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
} while (ret == -EINTR);

assert(ret >= 0);

return process_cq_ring(ctx, ready_list);
Expand Down

0 comments on commit 17e1e49

Please sign in to comment.