Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-reques…
Browse files Browse the repository at this point in the history
…t' into staging

# gpg: Signature made Mon 16 Jan 2017 13:38:52 GMT
# gpg:                using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request:
  async: optimize aio_bh_poll
  aio: document locking
  aio-win32: remove walking_handlers, protecting AioHandler list with list_lock
  aio-posix: remove walking_handlers, protecting AioHandler list with list_lock
  aio: tweak walking in dispatch phase
  aio-posix: split aio_dispatch_handlers out of aio_dispatch
  qemu-thread: optimize QemuLockCnt with futexes on Linux
  aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh
  qemu-thread: introduce QemuLockCnt
  aio: rename bh_lock to list_lock
  block: get rid of bdrv_io_unplugged_begin/end

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
  • Loading branch information
pm215 committed Jan 17, 2017
2 parents a8c611e + 7d506c9 commit 02b351d
Show file tree
Hide file tree
Showing 16 changed files with 1,009 additions and 202 deletions.
116 changes: 67 additions & 49 deletions aio-posix.c
Expand Up @@ -16,7 +16,7 @@
#include "qemu/osdep.h"
#include "qemu-common.h"
#include "block/block.h"
#include "qemu/queue.h"
#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
#include "qemu/cutils.h"
#include "trace.h"
Expand Down Expand Up @@ -66,7 +66,7 @@ static bool aio_epoll_try_enable(AioContext *ctx)
AioHandler *node;
struct epoll_event event;

QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
int r;
if (node->deleted || !node->pfd.events) {
continue;
Expand Down Expand Up @@ -212,24 +212,27 @@ void aio_set_fd_handler(AioContext *ctx,
bool is_new = false;
bool deleted = false;

qemu_lockcnt_lock(&ctx->list_lock);

node = find_aio_handler(ctx, fd);

/* Are we deleting the fd handler? */
if (!io_read && !io_write && !io_poll) {
if (node == NULL) {
qemu_lockcnt_unlock(&ctx->list_lock);
return;
}

g_source_remove_poll(&ctx->source, &node->pfd);

/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
* releasing the walking_handlers lock.
* deleted because deleted nodes are only cleaned up while
* no one is walking the handlers list.
*/
QLIST_REMOVE(node, node);
deleted = true;
Expand All @@ -243,7 +246,7 @@ void aio_set_fd_handler(AioContext *ctx,
/* Alloc and insert if it's not already there */
node = g_new0(AioHandler, 1);
node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);

g_source_add_poll(&ctx->source, &node->pfd);
is_new = true;
Expand All @@ -265,6 +268,7 @@ void aio_set_fd_handler(AioContext *ctx,
}

aio_epoll_update(ctx, node, is_new);
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);

if (deleted) {
Expand Down Expand Up @@ -316,8 +320,8 @@ static void poll_set_started(AioContext *ctx, bool started)

ctx->poll_started = started;

ctx->walking_handlers++;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
qemu_lockcnt_inc(&ctx->list_lock);
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
IOHandler *fn;

if (node->deleted) {
Expand All @@ -334,7 +338,7 @@ static void poll_set_started(AioContext *ctx, bool started)
fn(node->opaque);
}
}
ctx->walking_handlers--;
qemu_lockcnt_dec(&ctx->list_lock);
}


Expand All @@ -349,54 +353,47 @@ bool aio_prepare(AioContext *ctx)
bool aio_pending(AioContext *ctx)
{
AioHandler *node;
bool result = false;

QLIST_FOREACH(node, &ctx->aio_handlers, node) {
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
qemu_lockcnt_inc(&ctx->list_lock);

QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
int revents;

revents = node->pfd.revents & node->pfd.events;
if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
aio_node_check(ctx, node->is_external)) {
return true;
result = true;
break;
}
if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
aio_node_check(ctx, node->is_external)) {
return true;
result = true;
break;
}
}
qemu_lockcnt_dec(&ctx->list_lock);

return false;
return result;
}

/*
* Note that dispatch_fds == false has the side-effect of post-poning the
* freeing of deleted handlers.
*/
bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
static bool aio_dispatch_handlers(AioContext *ctx)
{
AioHandler *node = NULL;
AioHandler *node, *tmp;
bool progress = false;

/*
* If there are callbacks left that have been queued, we need to call them.
* Do not call select in this case, because it is possible that the caller
* does not need a complete flush (as is the case for aio_poll loops).
*/
if (aio_bh_poll(ctx)) {
progress = true;
}

/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
if (dispatch_fds) {
node = QLIST_FIRST(&ctx->aio_handlers);
}
while (node) {
AioHandler *tmp;
int revents;
qemu_lockcnt_inc(&ctx->list_lock);

ctx->walking_handlers++;
QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
int revents;

revents = node->pfd.revents & node->pfd.events;
node->pfd.revents = 0;
Expand All @@ -420,15 +417,36 @@ bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
progress = true;
}

tmp = node;
node = QLIST_NEXT(node, node);
if (node->deleted) {
if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
QLIST_REMOVE(node, node);
g_free(node);
qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
}
}
}

ctx->walking_handlers--;
qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}

if (!ctx->walking_handlers && tmp->deleted) {
QLIST_REMOVE(tmp, node);
g_free(tmp);
}
/*
* Note that dispatch_fds == false has the side-effect of post-poning the
* freeing of deleted handlers.
*/
bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
{
bool progress;

/*
* If there are callbacks left that have been queued, we need to call them.
* Do not call select in this case, because it is possible that the caller
* does not need a complete flush (as is the case for aio_poll loops).
*/
progress = aio_bh_poll(ctx);

if (dispatch_fds) {
progress |= aio_dispatch_handlers(ctx);
}

/* Run our timers */
Expand Down Expand Up @@ -488,7 +506,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
bool progress = false;
AioHandler *node;

QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->io_poll &&
node->io_poll(node->opaque)) {
progress = true;
Expand All @@ -509,7 +527,7 @@ static bool run_poll_handlers_once(AioContext *ctx)
* Note that ctx->notify_me must be non-zero so this function can detect
* aio_notify().
*
* Note that the caller must have incremented ctx->walking_handlers.
* Note that the caller must have incremented ctx->list_lock.
*
* Returns: true if progress was made, false otherwise
*/
Expand All @@ -519,7 +537,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
int64_t end_time;

assert(ctx->notify_me);
assert(ctx->walking_handlers > 0);
assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
assert(ctx->poll_disable_cnt == 0);

trace_run_poll_handlers_begin(ctx, max_ns);
Expand All @@ -541,7 +559,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
*
* ctx->notify_me must be non-zero so this function can detect aio_notify().
*
* Note that the caller must have incremented ctx->walking_handlers.
* Note that the caller must have incremented ctx->list_lock.
*
* Returns: true if progress was made, false otherwise
*/
Expand Down Expand Up @@ -592,7 +610,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
atomic_add(&ctx->notify_me, 2);
}

ctx->walking_handlers++;
qemu_lockcnt_inc(&ctx->list_lock);

if (ctx->poll_max_ns) {
start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
Expand All @@ -606,7 +624,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
/* fill pollfds */

if (!aio_epoll_enabled(ctx)) {
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->pfd.events
&& aio_node_check(ctx, node->is_external)) {
add_pollfd(node);
Expand Down Expand Up @@ -691,7 +709,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
}

npfd = 0;
ctx->walking_handlers--;
qemu_lockcnt_dec(&ctx->list_lock);

/* Run dispatch even if there were no readable fds to run timers */
if (aio_dispatch(ctx, ret > 0)) {
Expand Down

0 comments on commit 02b351d

Please sign in to comment.