Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-reques…
Browse files Browse the repository at this point in the history
…t' into staging

# gpg: Signature made Wed Jul 22 12:43:35 2015 BST using RSA key ID 81AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>"

* remotes/stefanha/tags/block-pull-request:
  AioContext: optimize clearing the EventNotifier
  AioContext: fix broken placement of event_notifier_test_and_clear
  AioContext: fix broken ctx->dispatching optimization
  aio-win32: reorganize polling loop
  tests: remove irrelevant assertions from test-aio
  qemu-timer: initialize "timers_done_ev" to set
  mirror: Speed up bitmap initial scanning

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
  • Loading branch information
pm215 committed Jul 22, 2015
2 parents b9c4630 + 05e514b commit dc94bd9
Show file tree
Hide file tree
Showing 10 changed files with 447 additions and 128 deletions.
20 changes: 10 additions & 10 deletions aio-posix.c
Expand Up @@ -233,26 +233,23 @@ static void add_pollfd(AioHandler *node)
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
bool was_dispatching;
int i, ret;
bool progress;
int64_t timeout;

aio_context_acquire(ctx);
was_dispatching = ctx->dispatching;
progress = false;

/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
* already true when aio_poll is called with blocking == false;
* if blocking == true, it is only true after poll() returns.
*
* If we're in a nested event loop, ctx->dispatching might be true.
* In that case we can restore it just before returning, but we
* have to clear it now.
* if blocking == true, it is only true after poll() returns,
* so disable the optimization now.
*/
aio_set_dispatching(ctx, !blocking);
if (blocking) {
atomic_add(&ctx->notify_me, 2);
}

ctx->walking_handlers++;

Expand All @@ -272,10 +269,15 @@ bool aio_poll(AioContext *ctx, bool blocking)
aio_context_release(ctx);
}
ret = qemu_poll_ns((GPollFD *)pollfds, npfd, timeout);
if (blocking) {
atomic_sub(&ctx->notify_me, 2);
}
if (timeout) {
aio_context_acquire(ctx);
}

aio_notify_accept(ctx);

/* if we have any readable fds, dispatch event */
if (ret > 0) {
for (i = 0; i < npfd; i++) {
Expand All @@ -287,12 +289,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers--;

/* Run dispatch even if there were no readable fds to run timers */
aio_set_dispatching(ctx, true);
if (aio_dispatch(ctx)) {
progress = true;
}

aio_set_dispatching(ctx, was_dispatching);
aio_context_release(ctx);

return progress;
Expand Down
48 changes: 26 additions & 22 deletions aio-win32.c
Expand Up @@ -279,30 +279,25 @@ bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
bool was_dispatching, progress, have_select_revents, first;
bool progress, have_select_revents, first;
int count;
int timeout;

aio_context_acquire(ctx);
have_select_revents = aio_prepare(ctx);
if (have_select_revents) {
blocking = false;
}

was_dispatching = ctx->dispatching;
progress = false;

/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
* already true when aio_poll is called with blocking == false;
* if blocking == true, it is only true after poll() returns.
*
* If we're in a nested event loop, ctx->dispatching might be true.
* In that case we can restore it just before returning, but we
* have to clear it now.
* if blocking == true, it is only true after poll() returns,
* so disable the optimization now.
*/
aio_set_dispatching(ctx, !blocking);
if (blocking) {
atomic_add(&ctx->notify_me, 2);
}

have_select_revents = aio_prepare(ctx);

ctx->walking_handlers++;

Expand All @@ -317,26 +312,36 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers--;
first = true;

/* wait until next event */
while (count > 0) {
/* ctx->notifier is always registered. */
assert(count > 0);

/* Multiple iterations, all of them non-blocking except the first,
* may be necessary to process all pending events. After the first
* WaitForMultipleObjects call ctx->notify_me will be decremented.
*/
do {
HANDLE event;
int ret;

timeout = blocking
timeout = blocking && !have_select_revents
? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
if (timeout) {
aio_context_release(ctx);
}
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
if (blocking) {
assert(first);
atomic_sub(&ctx->notify_me, 2);
}
if (timeout) {
aio_context_acquire(ctx);
}
aio_set_dispatching(ctx, true);

if (first && aio_bh_poll(ctx)) {
progress = true;
if (first) {
aio_notify_accept(ctx);
progress |= aio_bh_poll(ctx);
first = false;
}
first = false;

/* if we have any signaled events, dispatch event */
event = NULL;
Expand All @@ -351,11 +356,10 @@ bool aio_poll(AioContext *ctx, bool blocking)
blocking = false;

progress |= aio_dispatch_handlers(ctx, event);
}
} while (count > 0);

progress |= timerlistgroup_run_timers(&ctx->tlg);

aio_set_dispatching(ctx, was_dispatching);
aio_context_release(ctx);
return progress;
}
35 changes: 21 additions & 14 deletions async.c
Expand Up @@ -184,6 +184,8 @@ aio_ctx_prepare(GSource *source, gint *timeout)
{
AioContext *ctx = (AioContext *) source;

atomic_or(&ctx->notify_me, 1);

/* We assume there is no timeout already supplied */
*timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));

Expand All @@ -200,6 +202,9 @@ aio_ctx_check(GSource *source)
AioContext *ctx = (AioContext *) source;
QEMUBH *bh;

atomic_and(&ctx->notify_me, ~1);
aio_notify_accept(ctx);

for (bh = ctx->first_bh; bh; bh = bh->next) {
if (!bh->deleted && bh->scheduled) {
return true;
Expand Down Expand Up @@ -254,24 +259,22 @@ ThreadPool *aio_get_thread_pool(AioContext *ctx)
return ctx->thread_pool;
}

void aio_set_dispatching(AioContext *ctx, bool dispatching)
void aio_notify(AioContext *ctx)
{
ctx->dispatching = dispatching;
if (!dispatching) {
/* Write ctx->dispatching before reading e.g. bh->scheduled.
* Optimization: this is only needed when we're entering the "unsafe"
* phase where other threads must call event_notifier_set.
*/
smp_mb();
/* Write e.g. bh->scheduled before reading ctx->notify_me. Pairs
* with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
*/
smp_mb();
if (ctx->notify_me) {
event_notifier_set(&ctx->notifier);
atomic_mb_set(&ctx->notified, true);
}
}

void aio_notify(AioContext *ctx)
void aio_notify_accept(AioContext *ctx)
{
/* Write e.g. bh->scheduled before reading ctx->dispatching. */
smp_mb();
if (!ctx->dispatching) {
event_notifier_set(&ctx->notifier);
if (atomic_xchg(&ctx->notified, false)) {
event_notifier_test_and_clear(&ctx->notifier);
}
}

Expand All @@ -286,6 +289,10 @@ static void aio_rfifolock_cb(void *opaque)
aio_notify(opaque);
}

static void event_notifier_dummy_cb(EventNotifier *e)
{
}

AioContext *aio_context_new(Error **errp)
{
int ret;
Expand All @@ -300,7 +307,7 @@ AioContext *aio_context_new(Error **errp)
g_source_set_can_recurse(&ctx->source, true);
aio_set_event_notifier(ctx, &ctx->notifier,
(EventNotifierHandler *)
event_notifier_test_and_clear);
event_notifier_dummy_cb);
ctx->thread_pool = NULL;
qemu_mutex_init(&ctx->bh_lock);
rfifolock_init(&ctx->lock, aio_rfifolock_cb, ctx);
Expand Down
14 changes: 6 additions & 8 deletions block/mirror.c
Expand Up @@ -388,7 +388,7 @@ static void coroutine_fn mirror_run(void *opaque)
MirrorBlockJob *s = opaque;
MirrorExitData *data;
BlockDriverState *bs = s->common.bs;
int64_t sector_num, end, sectors_per_chunk, length;
int64_t sector_num, end, length;
uint64_t last_pause_ns;
BlockDriverInfo bdi;
char backing_filename[2]; /* we only need 2 characters because we are only
Expand Down Expand Up @@ -442,15 +442,16 @@ static void coroutine_fn mirror_run(void *opaque)
goto immediate_exit;
}

sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
mirror_free_init(s);

last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
if (!s->is_none_mode) {
/* First part, loop on the sectors and initialize the dirty bitmap. */
BlockDriverState *base = s->base;
for (sector_num = 0; sector_num < end; ) {
int64_t next = (sector_num | (sectors_per_chunk - 1)) + 1;
/* Just to make sure we are not exceeding int limit. */
int nb_sectors = MIN(INT_MAX >> BDRV_SECTOR_BITS,
end - sector_num);
int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);

if (now - last_pause_ns > SLICE_TIME) {
Expand All @@ -462,8 +463,7 @@ static void coroutine_fn mirror_run(void *opaque)
goto immediate_exit;
}

ret = bdrv_is_allocated_above(bs, base,
sector_num, next - sector_num, &n);
ret = bdrv_is_allocated_above(bs, base, sector_num, nb_sectors, &n);

if (ret < 0) {
goto immediate_exit;
Expand All @@ -472,10 +472,8 @@ static void coroutine_fn mirror_run(void *opaque)
assert(n > 0);
if (ret == 1) {
bdrv_set_dirty_bitmap(s->dirty_bitmap, sector_num, n);
sector_num = next;
} else {
sector_num += n;
}
sector_num += n;
}
}

Expand Down

0 comments on commit dc94bd9

Please sign in to comment.