diff --git a/aio-posix.c b/aio-posix.c index 15855715d45a..9453d8374331 100644 --- a/aio-posix.c +++ b/aio-posix.c @@ -16,7 +16,7 @@ #include "qemu/osdep.h" #include "qemu-common.h" #include "block/block.h" -#include "qemu/queue.h" +#include "qemu/rcu_queue.h" #include "qemu/sockets.h" #include "qemu/cutils.h" #include "trace.h" @@ -66,7 +66,7 @@ static bool aio_epoll_try_enable(AioContext *ctx) AioHandler *node; struct epoll_event event; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { int r; if (node->deleted || !node->pfd.events) { continue; @@ -212,24 +212,27 @@ void aio_set_fd_handler(AioContext *ctx, bool is_new = false; bool deleted = false; + qemu_lockcnt_lock(&ctx->list_lock); + node = find_aio_handler(ctx, fd); /* Are we deleting the fd handler? */ if (!io_read && !io_write && !io_poll) { if (node == NULL) { + qemu_lockcnt_unlock(&ctx->list_lock); return; } g_source_remove_poll(&ctx->source, &node->pfd); /* If the lock is held, just mark the node as deleted */ - if (ctx->walking_handlers) { + if (qemu_lockcnt_count(&ctx->list_lock)) { node->deleted = 1; node->pfd.revents = 0; } else { /* Otherwise, delete it for real. We can't just mark it as - * deleted because deleted nodes are only cleaned up after - * releasing the walking_handlers lock. + * deleted because deleted nodes are only cleaned up while + * no one is walking the handlers list. */ QLIST_REMOVE(node, node); deleted = true; @@ -243,7 +246,7 @@ void aio_set_fd_handler(AioContext *ctx, /* Alloc and insert if it's not already there */ node = g_new0(AioHandler, 1); node->pfd.fd = fd; - QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); g_source_add_poll(&ctx->source, &node->pfd); is_new = true; @@ -265,6 +268,7 @@ void aio_set_fd_handler(AioContext *ctx, } aio_epoll_update(ctx, node, is_new); + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); if (deleted) { @@ -316,8 +320,8 @@ static void poll_set_started(AioContext *ctx, bool started) ctx->poll_started = started; - ctx->walking_handlers++; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + qemu_lockcnt_inc(&ctx->list_lock); + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { IOHandler *fn; if (node->deleted) { @@ -334,7 +338,7 @@ static void poll_set_started(AioContext *ctx, bool started) fn(node->opaque); } } - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); } @@ -349,54 +353,47 @@ bool aio_prepare(AioContext *ctx) bool aio_pending(AioContext *ctx) { AioHandler *node; + bool result = false; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { int revents; revents = node->pfd.revents & node->pfd.events; if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && aio_node_check(ctx, node->is_external)) { - return true; + result = true; + break; } if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && aio_node_check(ctx, node->is_external)) { - return true; + result = true; + break; } } + qemu_lockcnt_dec(&ctx->list_lock); - return false; + return result; } -/* - * Note that dispatch_fds == false has the side-effect of post-poning the - * freeing of deleted handlers. - */ -bool aio_dispatch(AioContext *ctx, bool dispatch_fds) +static bool aio_dispatch_handlers(AioContext *ctx) { - AioHandler *node = NULL; + AioHandler *node, *tmp; bool progress = false; - /* - * If there are callbacks left that have been queued, we need to call them. - * Do not call select in this case, because it is possible that the caller - * does not need a complete flush (as is the case for aio_poll loops). - */ - if (aio_bh_poll(ctx)) { - progress = true; - } - /* * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ - if (dispatch_fds) { - node = QLIST_FIRST(&ctx->aio_handlers); - } - while (node) { - AioHandler *tmp; - int revents; + qemu_lockcnt_inc(&ctx->list_lock); - ctx->walking_handlers++; + QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { + int revents; revents = node->pfd.revents & node->pfd.events; node->pfd.revents = 0; @@ -420,15 +417,36 @@ bool aio_dispatch(AioContext *ctx, bool dispatch_fds) progress = true; } - tmp = node; - node = QLIST_NEXT(node, node); + if (node->deleted) { + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { + QLIST_REMOVE(node, node); + g_free(node); + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); + } + } + } - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); + return progress; +} - if (!ctx->walking_handlers && tmp->deleted) { - QLIST_REMOVE(tmp, node); - g_free(tmp); - } +/* + * Note that dispatch_fds == false has the side-effect of post-poning the + * freeing of deleted handlers. + */ +bool aio_dispatch(AioContext *ctx, bool dispatch_fds) +{ + bool progress; + + /* + * If there are callbacks left that have been queued, we need to call them. + * Do not call select in this case, because it is possible that the caller + * does not need a complete flush (as is the case for aio_poll loops). + */ + progress = aio_bh_poll(ctx); + + if (dispatch_fds) { + progress |= aio_dispatch_handlers(ctx); } /* Run our timers */ @@ -488,7 +506,7 @@ static bool run_poll_handlers_once(AioContext *ctx) bool progress = false; AioHandler *node; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (!node->deleted && node->io_poll && node->io_poll(node->opaque)) { progress = true; @@ -509,7 +527,7 @@ static bool run_poll_handlers_once(AioContext *ctx) * Note that ctx->notify_me must be non-zero so this function can detect * aio_notify(). * - * Note that the caller must have incremented ctx->walking_handlers. + * Note that the caller must have incremented ctx->list_lock. * * Returns: true if progress was made, false otherwise */ @@ -519,7 +537,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) int64_t end_time; assert(ctx->notify_me); - assert(ctx->walking_handlers > 0); + assert(qemu_lockcnt_count(&ctx->list_lock) > 0); assert(ctx->poll_disable_cnt == 0); trace_run_poll_handlers_begin(ctx, max_ns); @@ -541,7 +559,7 @@ static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) * * ctx->notify_me must be non-zero so this function can detect aio_notify(). * - * Note that the caller must have incremented ctx->walking_handlers. + * Note that the caller must have incremented ctx->list_lock. * * Returns: true if progress was made, false otherwise */ @@ -592,7 +610,7 @@ bool aio_poll(AioContext *ctx, bool blocking) atomic_add(&ctx->notify_me, 2); } - ctx->walking_handlers++; + qemu_lockcnt_inc(&ctx->list_lock); if (ctx->poll_max_ns) { start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); @@ -606,7 +624,7 @@ bool aio_poll(AioContext *ctx, bool blocking) /* fill pollfds */ if (!aio_epoll_enabled(ctx)) { - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (!node->deleted && node->pfd.events && aio_node_check(ctx, node->is_external)) { add_pollfd(node); @@ -691,7 +709,7 @@ bool aio_poll(AioContext *ctx, bool blocking) } npfd = 0; - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); /* Run dispatch even if there were no readable fds to run timers */ if (aio_dispatch(ctx, ret > 0)) { diff --git a/aio-win32.c b/aio-win32.c index d19dc429d8d4..900524c9c24e 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -21,6 +21,7 @@ #include "qemu/queue.h" #include "qemu/sockets.h" #include "qapi/error.h" +#include "qemu/rcu_queue.h" struct AioHandler { EventNotifier *e; @@ -45,6 +46,7 @@ void aio_set_fd_handler(AioContext *ctx, /* fd is a SOCKET in our case */ AioHandler *node; + qemu_lockcnt_lock(&ctx->list_lock); QLIST_FOREACH(node, &ctx->aio_handlers, node) { if (node->pfd.fd == fd && !node->deleted) { break; @@ -54,14 +56,14 @@ void aio_set_fd_handler(AioContext *ctx, /* Are we deleting the fd handler? */ if (!io_read && !io_write) { if (node) { - /* If the lock is held, just mark the node as deleted */ - if (ctx->walking_handlers) { + /* If aio_poll is in progress, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { node->deleted = 1; node->pfd.revents = 0; } else { /* Otherwise, delete it for real. We can't just mark it as * deleted because deleted nodes are only cleaned up after - * releasing the walking_handlers lock. + * releasing the list_lock. */ QLIST_REMOVE(node, node); g_free(node); @@ -74,7 +76,7 @@ void aio_set_fd_handler(AioContext *ctx, /* Alloc and insert if it's not already there */ node = g_new0(AioHandler, 1); node->pfd.fd = fd; - QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); } node->pfd.events = 0; @@ -99,6 +101,7 @@ void aio_set_fd_handler(AioContext *ctx, FD_CONNECT | FD_WRITE | FD_OOB); } + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); } @@ -117,6 +120,7 @@ void aio_set_event_notifier(AioContext *ctx, { AioHandler *node; + qemu_lockcnt_lock(&ctx->list_lock); QLIST_FOREACH(node, &ctx->aio_handlers, node) { if (node->e == e && !node->deleted) { break; @@ -128,14 +132,14 @@ void aio_set_event_notifier(AioContext *ctx, if (node) { g_source_remove_poll(&ctx->source, &node->pfd); - /* If the lock is held, just mark the node as deleted */ - if (ctx->walking_handlers) { + /* aio_poll is in progress, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { node->deleted = 1; node->pfd.revents = 0; } else { /* Otherwise, delete it for real. We can't just mark it as * deleted because deleted nodes are only cleaned up after - * releasing the walking_handlers lock. + * releasing the list_lock. */ QLIST_REMOVE(node, node); g_free(node); @@ -149,7 +153,7 @@ void aio_set_event_notifier(AioContext *ctx, node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); node->pfd.events = G_IO_IN; node->is_external = is_external; - QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); g_source_add_poll(&ctx->source, &node->pfd); } @@ -157,6 +161,7 @@ void aio_set_event_notifier(AioContext *ctx, node->io_notify = io_notify; } + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); } @@ -175,10 +180,16 @@ bool aio_prepare(AioContext *ctx) bool have_select_revents = false; fd_set rfds, wfds; + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + /* fill fd sets */ FD_ZERO(&rfds); FD_ZERO(&wfds); - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (node->io_read) { FD_SET ((SOCKET)node->pfd.fd, &rfds); } @@ -188,7 +199,7 @@ bool aio_prepare(AioContext *ctx) } if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { node->pfd.revents = 0; if (FD_ISSET(node->pfd.fd, &rfds)) { node->pfd.revents |= G_IO_IN; @@ -202,45 +213,55 @@ bool aio_prepare(AioContext *ctx) } } + qemu_lockcnt_dec(&ctx->list_lock); return have_select_revents; } bool aio_pending(AioContext *ctx) { AioHandler *node; + bool result = false; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (node->pfd.revents && node->io_notify) { - return true; + result = true; + break; } if ((node->pfd.revents & G_IO_IN) && node->io_read) { - return true; + result = true; + break; } if ((node->pfd.revents & G_IO_OUT) && node->io_write) { - return true; + result = true; + break; } } - return false; + qemu_lockcnt_dec(&ctx->list_lock); + return result; } static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) { AioHandler *node; bool progress = false; + AioHandler *tmp; + + qemu_lockcnt_inc(&ctx->list_lock); /* * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ - node = QLIST_FIRST(&ctx->aio_handlers); - while (node) { - AioHandler *tmp; + QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { int revents = node->pfd.revents; - ctx->walking_handlers++; - if (!node->deleted && (revents || event_notifier_get_handle(node->e) == event) && node->io_notify) { @@ -275,17 +296,16 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) } } - tmp = node; - node = QLIST_NEXT(node, node); - - ctx->walking_handlers--; - - if (!ctx->walking_handlers && tmp->deleted) { - QLIST_REMOVE(tmp, node); - g_free(tmp); + if (node->deleted) { + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { + QLIST_REMOVE(node, node); + g_free(node); + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); + } } } + qemu_lockcnt_dec(&ctx->list_lock); return progress; } @@ -323,20 +343,19 @@ bool aio_poll(AioContext *ctx, bool blocking) atomic_add(&ctx->notify_me, 2); } + qemu_lockcnt_inc(&ctx->list_lock); have_select_revents = aio_prepare(ctx); - ctx->walking_handlers++; - /* fill fd sets */ count = 0; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (!node->deleted && node->io_notify && aio_node_check(ctx, node->is_external)) { events[count++] = event_notifier_get_handle(node->e); } } - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); first = true; /* ctx->notifier is always registered. */ diff --git a/async.c b/async.c index 29601718345c..0d218ab0e083 100644 --- a/async.c +++ b/async.c @@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_mutex_lock(&ctx->bh_lock); + qemu_lockcnt_lock(&ctx->list_lock); bh->next = ctx->first_bh; bh->scheduled = 1; bh->deleted = 1; /* Make sure that the members are ready before putting bh into list */ smp_wmb(); ctx->first_bh = bh; - qemu_mutex_unlock(&ctx->bh_lock); + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); } @@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_mutex_lock(&ctx->bh_lock); + qemu_lockcnt_lock(&ctx->list_lock); bh->next = ctx->first_bh; /* Make sure that the members are ready before putting bh into list */ smp_wmb(); ctx->first_bh = bh; - qemu_mutex_unlock(&ctx->bh_lock); + qemu_lockcnt_unlock(&ctx->list_lock); return bh; } @@ -92,14 +92,13 @@ int aio_bh_poll(AioContext *ctx) { QEMUBH *bh, **bhp, *next; int ret; + bool deleted = false; - ctx->walking_bh++; + qemu_lockcnt_inc(&ctx->list_lock); ret = 0; - for (bh = ctx->first_bh; bh; bh = next) { - /* Make sure that fetching bh happens before accessing its members */ - smp_read_barrier_depends(); - next = bh->next; + for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) { + next = atomic_rcu_read(&bh->next); /* The atomic_xchg is paired with the one in qemu_bh_schedule. The * implicit memory barrier ensures that the callback sees all writes * done by the scheduling thread. It also ensures that the scheduling @@ -114,13 +113,18 @@ int aio_bh_poll(AioContext *ctx) bh->idle = 0; aio_bh_call(bh); } + if (bh->deleted) { + deleted = true; + } } - ctx->walking_bh--; - /* remove deleted bhs */ - if (!ctx->walking_bh) { - qemu_mutex_lock(&ctx->bh_lock); + if (!deleted) { + qemu_lockcnt_dec(&ctx->list_lock); + return ret; + } + + if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) { bhp = &ctx->first_bh; while (*bhp) { bh = *bhp; @@ -131,9 +135,8 @@ int aio_bh_poll(AioContext *ctx) bhp = &bh->next; } } - qemu_mutex_unlock(&ctx->bh_lock); + qemu_lockcnt_unlock(&ctx->list_lock); } - return ret; } @@ -187,7 +190,8 @@ aio_compute_timeout(AioContext *ctx) int timeout = -1; QEMUBH *bh; - for (bh = ctx->first_bh; bh; bh = bh->next) { + for (bh = atomic_rcu_read(&ctx->first_bh); bh; + bh = atomic_rcu_read(&bh->next)) { if (bh->scheduled) { if (bh->idle) { /* idle bottom halves will be polled at least @@ -270,7 +274,8 @@ aio_ctx_finalize(GSource *source) } #endif - qemu_mutex_lock(&ctx->bh_lock); + qemu_lockcnt_lock(&ctx->list_lock); + assert(!qemu_lockcnt_count(&ctx->list_lock)); while (ctx->first_bh) { QEMUBH *next = ctx->first_bh->next; @@ -280,12 +285,12 @@ aio_ctx_finalize(GSource *source) g_free(ctx->first_bh); ctx->first_bh = next; } - qemu_mutex_unlock(&ctx->bh_lock); + qemu_lockcnt_unlock(&ctx->list_lock); aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL); event_notifier_cleanup(&ctx->notifier); qemu_rec_mutex_destroy(&ctx->lock); - qemu_mutex_destroy(&ctx->bh_lock); + qemu_lockcnt_destroy(&ctx->list_lock); timerlistgroup_deinit(&ctx->tlg); } @@ -372,6 +377,7 @@ AioContext *aio_context_new(Error **errp) goto fail; } g_source_set_can_recurse(&ctx->source, true); + qemu_lockcnt_init(&ctx->list_lock); aio_set_event_notifier(ctx, &ctx->notifier, false, (EventNotifierHandler *) @@ -381,7 +387,6 @@ AioContext *aio_context_new(Error **errp) ctx->linux_aio = NULL; #endif ctx->thread_pool = NULL; - qemu_mutex_init(&ctx->bh_lock); qemu_rec_mutex_init(&ctx->lock); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); diff --git a/block/io.c b/block/io.c index 4f005623f71e..c42b34a96534 100644 --- a/block/io.c +++ b/block/io.c @@ -228,9 +228,7 @@ void bdrv_drained_begin(BlockDriverState *bs) bdrv_parent_drained_begin(bs); } - bdrv_io_unplugged_begin(bs); bdrv_drain_recurse(bs); - bdrv_io_unplugged_end(bs); } void bdrv_drained_end(BlockDriverState *bs) @@ -302,7 +300,6 @@ void bdrv_drain_all_begin(void) aio_context_acquire(aio_context); bdrv_parent_drained_begin(bs); - bdrv_io_unplugged_begin(bs); aio_disable_external(aio_context); aio_context_release(aio_context); @@ -347,7 +344,6 @@ void bdrv_drain_all_end(void) aio_context_acquire(aio_context); aio_enable_external(aio_context); - bdrv_io_unplugged_end(bs); bdrv_parent_drained_end(bs); aio_context_release(aio_context); } @@ -2650,7 +2646,7 @@ void bdrv_io_plug(BlockDriverState *bs) bdrv_io_plug(child->bs); } - if (bs->io_plugged++ == 0 && bs->io_plug_disabled == 0) { + if (bs->io_plugged++ == 0) { BlockDriver *drv = bs->drv; if (drv && drv->bdrv_io_plug) { drv->bdrv_io_plug(bs); @@ -2663,7 +2659,7 @@ void bdrv_io_unplug(BlockDriverState *bs) BdrvChild *child; assert(bs->io_plugged); - if (--bs->io_plugged == 0 && bs->io_plug_disabled == 0) { + if (--bs->io_plugged == 0) { BlockDriver *drv = bs->drv; if (drv && drv->bdrv_io_unplug) { drv->bdrv_io_unplug(bs); @@ -2674,36 +2670,3 @@ void bdrv_io_unplug(BlockDriverState *bs) bdrv_io_unplug(child->bs); } } - -void bdrv_io_unplugged_begin(BlockDriverState *bs) -{ - BdrvChild *child; - - if (bs->io_plug_disabled++ == 0 && bs->io_plugged > 0) { - BlockDriver *drv = bs->drv; - if (drv && drv->bdrv_io_unplug) { - drv->bdrv_io_unplug(bs); - } - } - - QLIST_FOREACH(child, &bs->children, next) { - bdrv_io_unplugged_begin(child->bs); - } -} - -void bdrv_io_unplugged_end(BlockDriverState *bs) -{ - BdrvChild *child; - - assert(bs->io_plug_disabled); - QLIST_FOREACH(child, &bs->children, next) { - bdrv_io_unplugged_end(child->bs); - } - - if (--bs->io_plug_disabled == 0 && bs->io_plugged > 0) { - BlockDriver *drv = bs->drv; - if (drv && drv->bdrv_io_plug) { - drv->bdrv_io_plug(bs); - } - } -} diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt new file mode 100644 index 000000000000..2a79b3205b47 --- /dev/null +++ b/docs/lockcnt.txt @@ -0,0 +1,277 @@ +DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt) +=================================================== + +QEMU often uses reference counts to track data structures that are being +accessed and should not be freed. For example, a loop that invoke +callbacks like this is not safe: + + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + +QLIST_FOREACH_SAFE protects against deletion of the current node (ioh) +by stashing away its "next" pointer. However, ioh->fd_write could +actually delete the next node from the list. The simplest way to +avoid this is to mark the node as deleted, and remove it from the +list in the above loop: + + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } else { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + } + +If however this loop must also be reentrant, i.e. it is possible that +ioh->fd_write invokes the loop again, some kind of counting is needed: + + walking_handlers++; + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + if (walking_handlers == 1) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } + } else { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + } + walking_handlers--; + +One may think of using the RCU primitives, rcu_read_lock() and +rcu_read_unlock(); effectively, the RCU nesting count would take +the place of the walking_handlers global variable. Indeed, +reference counting and RCU have similar purposes, but their usage in +general is complementary: + +- reference counting is fine-grained and limited to a single data + structure; RCU delays reclamation of *all* RCU-protected data + structures; + +- reference counting works even in the presence of code that keeps + a reference for a long time; RCU critical sections in principle + should be kept short; + +- reference counting is often applied to code that is not thread-safe + but is reentrant; in fact, usage of reference counting in QEMU predates + the introduction of threads by many years. RCU is generally used to + protect readers from other threads freeing memory after concurrent + modifications to a data structure. + +- reclaiming data can be done by a separate thread in the case of RCU; + this can improve performance, but also delay reclamation undesirably. + With reference counting, reclamation is deterministic. + +This file documents QemuLockCnt, an abstraction for using reference +counting in code that has to be both thread-safe and reentrant. + + +QemuLockCnt concepts +-------------------- + +A QemuLockCnt comprises both a counter and a mutex; it has primitives +to increment and decrement the counter, and to take and release the +mutex. The counter notes how many visits to the data structures are +taking place (the visits could be from different threads, or there could +be multiple reentrant visits from the same thread). The basic rules +governing the counter/mutex pair then are the following: + +- Data protected by the QemuLockCnt must not be freed unless the + counter is zero and the mutex is taken. + +- A new visit cannot be started while the counter is zero and the + mutex is taken. + +Most of the time, the mutex protects all writes to the data structure, +not just frees, though there could be cases where this is not necessary. + +Reads, instead, can be done without taking the mutex, as long as the +readers and writers use the same macros that are used for RCU, for +example atomic_rcu_read, atomic_rcu_set, QLIST_FOREACH_RCU, etc. This is +because the reads are done outside a lock and a set or QLIST_INSERT_HEAD +can happen concurrently with the read. The RCU API ensures that the +processor and the compiler see all required memory barriers. + +This could be implemented simply by protecting the counter with the +mutex, for example: + + // (1) + qemu_mutex_lock(&walking_handlers_mutex); + walking_handlers++; + qemu_mutex_unlock(&walking_handlers_mutex); + + ... + + // (2) + qemu_mutex_lock(&walking_handlers_mutex); + if (--walking_handlers == 0) { + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } + } + } + qemu_mutex_unlock(&walking_handlers_mutex); + +Here, no frees can happen in the code represented by the ellipsis. +If another thread is executing critical section (2), that part of +the code cannot be entered, because the thread will not be able +to increment the walking_handlers variable. And of course +during the visit any other thread will see a nonzero value for +walking_handlers, as in the single-threaded code. + +Note that it is possible for multiple concurrent accesses to delay +the cleanup arbitrarily; in other words, for the walking_handlers +counter to never become zero. For this reason, this technique is +more easily applicable if concurrent access to the structure is rare. + +However, critical sections are easy to forget since you have to do +them for each modification of the counter. QemuLockCnt ensures that +all modifications of the counter take the lock appropriately, and it +can also be more efficient in two ways: + +- it avoids taking the lock for many operations (for example + incrementing the counter while it is non-zero); + +- on some platforms, one can implement QemuLockCnt to hold the lock + and the mutex in a single word, making the fast path no more expensive + than simply managing a counter using atomic operations (see + docs/atomics.txt). This can be very helpful if concurrent access to + the data structure is expected to be rare. + + +Using the same mutex for frees and writes can still incur some small +inefficiencies; for example, a visit can never start if the counter is +zero and the mutex is taken---even if the mutex is taken by a write, +which in principle need not block a visit of the data structure. +However, these are usually not a problem if any of the following +assumptions are valid: + +- concurrent access is possible but rare + +- writes are rare + +- writes are frequent, but this kind of write (e.g. appending to a + list) has a very small critical section. + +For example, QEMU uses QemuLockCnt to manage an AioContext's list of +bottom halves and file descriptor handlers. Modifications to the list +of file descriptor handlers are rare. Creation of a new bottom half is +frequent and can happen on a fast path; however: 1) it is almost never +concurrent with a visit to the list of bottom halves; 2) it only has +three instructions in the critical path, two assignments and a smp_wmb(). + + +QemuLockCnt API +--------------- + +The QemuLockCnt API is described in include/qemu/thread.h. + + +QemuLockCnt usage +----------------- + +This section explains the typical usage patterns for QemuLockCnt functions. + +Setting a variable to a non-NULL value can be done between +qemu_lockcnt_lock and qemu_lockcnt_unlock: + + qemu_lockcnt_lock(&xyz_lockcnt); + if (!xyz) { + new_xyz = g_new(XYZ, 1); + ... + atomic_rcu_set(&xyz, new_xyz); + } + qemu_lockcnt_unlock(&xyz_lockcnt); + +Accessing the value can be done between qemu_lockcnt_inc and +qemu_lockcnt_dec: + + qemu_lockcnt_inc(&xyz_lockcnt); + if (xyz) { + XYZ *p = atomic_rcu_read(&xyz); + ... + /* Accesses can now be done through "p". */ + } + qemu_lockcnt_dec(&xyz_lockcnt); + +Freeing the object can similarly use qemu_lockcnt_lock and +qemu_lockcnt_unlock, but you also need to ensure that the count +is zero (i.e. there is no concurrent visit). Because qemu_lockcnt_inc +takes the QemuLockCnt's lock, the count cannot become non-zero while +the object is being freed. Freeing an object looks like this: + + qemu_lockcnt_lock(&xyz_lockcnt); + if (!qemu_lockcnt_count(&xyz_lockcnt)) { + g_free(xyz); + xyz = NULL; + } + qemu_lockcnt_unlock(&xyz_lockcnt); + +If an object has to be freed right after a visit, you can combine +the decrement, the locking and the check on count as follows: + + qemu_lockcnt_inc(&xyz_lockcnt); + if (xyz) { + XYZ *p = atomic_rcu_read(&xyz); + ... + /* Accesses can now be done through "p". */ + } + if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) { + g_free(xyz); + xyz = NULL; + qemu_lockcnt_unlock(&xyz_lockcnt); + } + +QemuLockCnt can also be used to access a list as follows: + + qemu_lockcnt_inc(&io_handlers_lockcnt); + QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + + if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) { + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } + } + qemu_lockcnt_unlock(&io_handlers_lockcnt); + } + +Again, the RCU primitives are used because new items can be added to the +list during the walk. QLIST_FOREACH_RCU ensures that the processor and +the compiler see the appropriate memory barriers. + +An alternative pattern uses qemu_lockcnt_dec_if_lock: + + qemu_lockcnt_inc(&io_handlers_lockcnt); + QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + if (qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt); + } + } else { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + } + qemu_lockcnt_dec(&io_handlers_lockcnt); + +Here you can use qemu_lockcnt_dec instead of qemu_lockcnt_dec_and_lock, +because there is no special task to do if the count goes from 1 to 0. diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt index 0e7cdb2c28c4..e4d340bbb7fe 100644 --- a/docs/multiple-iothreads.txt +++ b/docs/multiple-iothreads.txt @@ -84,9 +84,8 @@ How to synchronize with an IOThread AioContext is not thread-safe so some rules must be followed when using file descriptors, event notifiers, timers, or BHs across threads: -1. AioContext functions can be called safely from file descriptor, event -notifier, timer, or BH callbacks invoked by the AioContext. No locking is -necessary. +1. AioContext functions can always be called safely. They handle their +own locking internally. 2. Other threads wishing to access the AioContext must use aio_context_acquire()/aio_context_release() for mutual exclusion. Once the @@ -94,16 +93,14 @@ context is acquired no other thread can access it or run event loop iterations in this AioContext. aio_context_acquire()/aio_context_release() calls may be nested. This -means you can call them if you're not sure whether #1 applies. +means you can call them if you're not sure whether #2 applies. There is currently no lock ordering rule if a thread needs to acquire multiple AioContexts simultaneously. Therefore, it is only safe for code holding the QEMU global mutex to acquire other AioContexts. -Side note: the best way to schedule a function call across threads is to create -a BH in the target AioContext beforehand and then call qemu_bh_schedule(). No -acquire/release or locking is needed for the qemu_bh_schedule() call. But be -sure to acquire the AioContext for aio_bh_new() if necessary. +Side note: the best way to schedule a function call across threads is to call +aio_bh_schedule_oneshot(). No acquire/release or locking is needed. AioContext and the block layer ------------------------------ diff --git a/include/block/aio.h b/include/block/aio.h index 4dca54d9c7e3..7df271d2b968 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -53,18 +53,12 @@ struct LinuxAioState; struct AioContext { GSource source; - /* Protects all fields from multi-threaded access */ + /* Used by AioContext users to protect from multi-threaded access. */ QemuRecMutex lock; - /* The list of registered AIO handlers */ + /* The list of registered AIO handlers. Protected by ctx->list_lock. */ QLIST_HEAD(, AioHandler) aio_handlers; - /* This is a simple lock used to protect the aio_handlers list. - * Specifically, it's used to ensure that no callbacks are removed while - * we're walking and dispatching callbacks. - */ - int walking_handlers; - /* Used to avoid unnecessary event_notifier_set calls in aio_notify; * accessed with atomic primitives. If this field is 0, everything * (file descriptors, bottom halves, timers) will be re-evaluated @@ -90,17 +84,15 @@ struct AioContext { */ uint32_t notify_me; - /* lock to protect between bh's adders and deleter */ - QemuMutex bh_lock; + /* A lock to protect between QEMUBH and AioHandler adders and deleter, + * and to ensure that no callbacks are removed while we're walking and + * dispatching them. + */ + QemuLockCnt list_lock; /* Anchor of the list of Bottom Halves belonging to the context */ struct QEMUBH *first_bh; - /* A simple lock used to protect the first_bh list, and ensure that - * no callbacks are removed while we're walking and dispatching callbacks. - */ - int walking_bh; - /* Used by aio_notify. * * "notified" is used to avoid expensive event_notifier_test_and_clear @@ -116,7 +108,9 @@ struct AioContext { bool notified; EventNotifier notifier; - /* Thread pool for performing work and receiving completion callbacks */ + /* Thread pool for performing work and receiving completion callbacks. + * Has its own locking. + */ struct ThreadPool *thread_pool; #ifdef CONFIG_LINUX_AIO @@ -126,7 +120,9 @@ struct AioContext { struct LinuxAioState *linux_aio; #endif - /* TimerLists for calling timers - one per clock type */ + /* TimerLists for calling timers - one per clock type. Has its own + * locking. + */ QEMUTimerListGroup tlg; int external_disable_cnt; @@ -180,9 +176,11 @@ void aio_context_unref(AioContext *ctx); * automatically takes care of calling aio_context_acquire and * aio_context_release. * - * Access to timers and BHs from a thread that has not acquired AioContext - * is possible. Access to callbacks for now must be done while the AioContext - * is owned by the thread (FIXME). + * Note that this is separate from bdrv_drained_begin/bdrv_drained_end. A + * thread still has to call those to avoid being interrupted by the guest. + * + * Bottom halves, timers and callbacks can be created or removed without + * acquiring the AioContext. */ void aio_context_acquire(AioContext *ctx); diff --git a/include/block/block.h b/include/block/block.h index 49bb0b239a50..8b0dcdaa707f 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -526,8 +526,6 @@ int bdrv_probe_geometry(BlockDriverState *bs, HDGeometry *geo); void bdrv_io_plug(BlockDriverState *bs); void bdrv_io_unplug(BlockDriverState *bs); -void bdrv_io_unplugged_begin(BlockDriverState *bs); -void bdrv_io_unplugged_end(BlockDriverState *bs); /** * bdrv_drained_begin: diff --git a/include/block/block_int.h b/include/block/block_int.h index 4e4562d44447..2d92d7edfeb3 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -526,9 +526,8 @@ struct BlockDriverState { uint64_t write_threshold_offset; NotifierWithReturn write_threshold_notifier; - /* counters for nested bdrv_io_plug and bdrv_io_unplugged_begin */ + /* counter for nested bdrv_io_plug */ unsigned io_plugged; - unsigned io_plug_disabled; int quiesce_counter; }; diff --git a/include/qemu/futex.h b/include/qemu/futex.h new file mode 100644 index 000000000000..bb7dc9e296c1 --- /dev/null +++ b/include/qemu/futex.h @@ -0,0 +1,36 @@ +/* + * Wrappers around Linux futex syscall + * + * Copyright Red Hat, Inc. 2017 + * + * Author: + * Paolo Bonzini + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include +#include + +#define qemu_futex(...) syscall(__NR_futex, __VA_ARGS__) + +static inline void qemu_futex_wake(void *f, int n) +{ + qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0); +} + +static inline void qemu_futex_wait(void *f, unsigned val) +{ + while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { + switch (errno) { + case EWOULDBLOCK: + return; + case EINTR: + break; /* get out of switch and retry */ + default: + abort(); + } + } +} diff --git a/include/qemu/thread.h b/include/qemu/thread.h index e8e665f020fa..9910f49b3a83 100644 --- a/include/qemu/thread.h +++ b/include/qemu/thread.h @@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex; typedef struct QemuCond QemuCond; typedef struct QemuSemaphore QemuSemaphore; typedef struct QemuEvent QemuEvent; +typedef struct QemuLockCnt QemuLockCnt; typedef struct QemuThread QemuThread; #ifdef _WIN32 @@ -98,4 +99,115 @@ static inline void qemu_spin_unlock(QemuSpin *spin) __sync_lock_release(&spin->value); } +struct QemuLockCnt { +#ifndef CONFIG_LINUX + QemuMutex mutex; +#endif + unsigned count; +}; + +/** + * qemu_lockcnt_init: initialize a QemuLockcnt + * @lockcnt: the lockcnt to initialize + * + * Initialize lockcnt's counter to zero and prepare its mutex + * for usage. + */ +void qemu_lockcnt_init(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_destroy: destroy a QemuLockcnt + * @lockcnt: the lockcnt to destruct + * + * Destroy lockcnt's mutex. + */ +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_inc: increment a QemuLockCnt's counter + * @lockcnt: the lockcnt to operate on + * + * If the lockcnt's count is zero, wait for critical sections + * to finish and increment lockcnt's count to 1. If the count + * is not zero, just increment it. + * + * Because this function can wait on the mutex, it must not be + * called while the lockcnt's mutex is held by the current thread. + * For the same reason, qemu_lockcnt_inc can also contribute to + * AB-BA deadlocks. This is a sample deadlock scenario: + * + * thread 1 thread 2 + * ------------------------------------------------------- + * qemu_lockcnt_lock(&lc1); + * qemu_lockcnt_lock(&lc2); + * qemu_lockcnt_inc(&lc2); + * qemu_lockcnt_inc(&lc1); + */ +void qemu_lockcnt_inc(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_dec: decrement a QemuLockCnt's counter + * @lockcnt: the lockcnt to operate on + */ +void qemu_lockcnt_dec(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_dec_and_lock: decrement a QemuLockCnt's counter and + * possibly lock it. + * @lockcnt: the lockcnt to operate on + * + * Decrement lockcnt's count. If the new count is zero, lock + * the mutex and return true. Otherwise, return false. + */ +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_dec_if_lock: possibly decrement a QemuLockCnt's counter and + * lock it. + * @lockcnt: the lockcnt to operate on + * + * If the count is 1, decrement the count to zero, lock + * the mutex and return true. Otherwise, return false. + */ +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_lock: lock a QemuLockCnt's mutex. + * @lockcnt: the lockcnt to operate on + * + * Remember that concurrent visits are not blocked unless the count is + * also zero. You can use qemu_lockcnt_count to check for this inside a + * critical section. + */ +void qemu_lockcnt_lock(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_unlock: release a QemuLockCnt's mutex. + * @lockcnt: the lockcnt to operate on. + */ +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_inc_and_unlock: combined unlock/increment on a QemuLockCnt. + * @lockcnt: the lockcnt to operate on. + * + * This is the same as + * + * qemu_lockcnt_unlock(lockcnt); + * qemu_lockcnt_inc(lockcnt); + * + * but more efficient. + */ +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt); + +/** + * qemu_lockcnt_count: query a LockCnt's count. + * @lockcnt: the lockcnt to query. + * + * Note that the count can change at any time. Still, while the + * lockcnt is locked, one can usefully check whether the count + * is non-zero. + */ +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt); + #endif diff --git a/util/Makefile.objs b/util/Makefile.objs index ad0f9c7fe4c7..c1f247d67535 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -1,5 +1,6 @@ util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o util-obj-y += bufferiszero.o +util-obj-y += lockcnt.o util-obj-$(CONFIG_POSIX) += compatfd.o util-obj-$(CONFIG_POSIX) += event_notifier-posix.o util-obj-$(CONFIG_POSIX) += mmap-alloc.o diff --git a/util/lockcnt.c b/util/lockcnt.c new file mode 100644 index 000000000000..4f88dcf8b898 --- /dev/null +++ b/util/lockcnt.c @@ -0,0 +1,397 @@ +/* + * QemuLockCnt implementation + * + * Copyright Red Hat, Inc. 2017 + * + * Author: + * Paolo Bonzini + */ +#include "qemu/osdep.h" +#include "qemu/thread.h" +#include "qemu/atomic.h" +#include "trace.h" + +#ifdef CONFIG_LINUX +#include "qemu/futex.h" + +/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter. + * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok, + * this is not the most relaxing citation I could make...). It is similar + * to mutex2 in the paper. + */ + +#define QEMU_LOCKCNT_STATE_MASK 3 +#define QEMU_LOCKCNT_STATE_FREE 0 /* free, uncontended */ +#define QEMU_LOCKCNT_STATE_LOCKED 1 /* locked, uncontended */ +#define QEMU_LOCKCNT_STATE_WAITING 2 /* locked, contended */ + +#define QEMU_LOCKCNT_COUNT_STEP 4 +#define QEMU_LOCKCNT_COUNT_SHIFT 2 + +void qemu_lockcnt_init(QemuLockCnt *lockcnt) +{ + lockcnt->count = 0; +} + +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt) +{ +} + +/* *val is the current value of lockcnt->count. + * + * If the lock is free, try a cmpxchg from *val to new_if_free; return + * true and set *val to the old value found by the cmpxchg in + * lockcnt->count. + * + * If the lock is taken, wait for it to be released and return false + * *without trying again to take the lock*. Again, set *val to the + * new value of lockcnt->count. + * + * If *waited is true on return, new_if_free's bottom two bits must not + * be QEMU_LOCKCNT_STATE_LOCKED on subsequent calls, because the caller + * does not know if there are other waiters. Furthermore, after *waited + * is set the caller has effectively acquired the lock. If it returns + * with the lock not taken, it must wake another futex waiter. + */ +static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val, + int new_if_free, bool *waited) +{ + /* Fast path for when the lock is free. */ + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) { + int expected = *val; + + trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free); + *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free); + if (*val == expected) { + trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free); + *val = new_if_free; + return true; + } + } + + /* The slow path moves from locked to waiting if necessary, then + * does a futex wait. Both steps can be repeated ad nauseam, + * only getting out of the loop if we can have another shot at the + * fast path. Once we can, get out to compute the new destination + * value for the fast path. + */ + while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) { + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) { + int expected = *val; + int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING; + + trace_lockcnt_futex_wait_prepare(lockcnt, expected, new); + *val = atomic_cmpxchg(&lockcnt->count, expected, new); + if (*val == expected) { + *val = new; + } + continue; + } + + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) { + *waited = true; + trace_lockcnt_futex_wait(lockcnt, *val); + qemu_futex_wait(&lockcnt->count, *val); + *val = atomic_read(&lockcnt->count); + trace_lockcnt_futex_wait_resume(lockcnt, *val); + continue; + } + + abort(); + } + return false; +} + +static void lockcnt_wake(QemuLockCnt *lockcnt) +{ + trace_lockcnt_futex_wake(lockcnt); + qemu_futex_wake(&lockcnt->count, 1); +} + +void qemu_lockcnt_inc(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + bool waited = false; + + for (;;) { + if (val >= QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP); + if (val == expected) { + break; + } + } else { + /* The fast path is (0, unlocked)->(1, unlocked). */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP, + &waited)) { + break; + } + } + } + + /* If we were woken by another thread, we should also wake one because + * we are effectively releasing the lock that was given to us. This is + * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING + * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and + * wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_dec(QemuLockCnt *lockcnt) +{ + atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP); +} + +/* Decrement a counter, and return locked if it is decremented to zero. + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + for (;;) { + if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + val = atomic_cmpxchg(&lockcnt->count, val, val - QEMU_LOCKCNT_COUNT_STEP); + if (val == expected) { + break; + } + } else { + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +/* If the counter is one, decrement it and return locked. Otherwise do + * nothing. + * + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) { + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +void qemu_lockcnt_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int step = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + /* The third argument is only used if the low bits of val are 0 + * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired + * state. + */ + while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) { + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + step = QEMU_LOCKCNT_STATE_WAITING; + } + } +} + +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = val & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) +{ + return atomic_read(&lockcnt->count) >> QEMU_LOCKCNT_COUNT_SHIFT; +} +#else +void qemu_lockcnt_init(QemuLockCnt *lockcnt) +{ + qemu_mutex_init(&lockcnt->mutex); + lockcnt->count = 0; +} + +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt) +{ + qemu_mutex_destroy(&lockcnt->mutex); +} + +void qemu_lockcnt_inc(QemuLockCnt *lockcnt) +{ + int old; + for (;;) { + old = atomic_read(&lockcnt->count); + if (old == 0) { + qemu_lockcnt_lock(lockcnt); + qemu_lockcnt_inc_and_unlock(lockcnt); + return; + } else { + if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) { + return; + } + } + } +} + +void qemu_lockcnt_dec(QemuLockCnt *lockcnt) +{ + atomic_dec(&lockcnt->count); +} + +/* Decrement a counter, and return locked if it is decremented to zero. + * It is impossible for the counter to become nonzero while the mutex + * is taken. + */ +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + while (val > 1) { + int old = atomic_cmpxchg(&lockcnt->count, val, val - 1); + if (old != val) { + val = old; + continue; + } + + return false; + } + + qemu_lockcnt_lock(lockcnt); + if (atomic_fetch_dec(&lockcnt->count) == 1) { + return true; + } + + qemu_lockcnt_unlock(lockcnt); + return false; +} + +/* Decrement a counter and return locked if it is decremented to zero. + * Otherwise do nothing. + * + * It is impossible for the counter to become nonzero while the mutex + * is taken. + */ +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt) +{ + /* No need for acquire semantics if we return false. */ + int val = atomic_read(&lockcnt->count); + if (val > 1) { + return false; + } + + qemu_lockcnt_lock(lockcnt); + if (atomic_fetch_dec(&lockcnt->count) == 1) { + return true; + } + + qemu_lockcnt_inc_and_unlock(lockcnt); + return false; +} + +void qemu_lockcnt_lock(QemuLockCnt *lockcnt) +{ + qemu_mutex_lock(&lockcnt->mutex); +} + +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt) +{ + atomic_inc(&lockcnt->count); + qemu_mutex_unlock(&lockcnt->mutex); +} + +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt) +{ + qemu_mutex_unlock(&lockcnt->mutex); +} + +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) +{ + return atomic_read(&lockcnt->count); +} +#endif diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c index d20cddec0cd7..37cd8ba3fea8 100644 --- a/util/qemu-thread-posix.c +++ b/util/qemu-thread-posix.c @@ -11,10 +11,6 @@ * */ #include "qemu/osdep.h" -#ifdef __linux__ -#include -#include -#endif #include "qemu/thread.h" #include "qemu/atomic.h" #include "qemu/notify.h" @@ -294,28 +290,9 @@ void qemu_sem_wait(QemuSemaphore *sem) } #ifdef __linux__ -#define futex(...) syscall(__NR_futex, __VA_ARGS__) - -static inline void futex_wake(QemuEvent *ev, int n) -{ - futex(ev, FUTEX_WAKE, n, NULL, NULL, 0); -} - -static inline void futex_wait(QemuEvent *ev, unsigned val) -{ - while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { - switch (errno) { - case EWOULDBLOCK: - return; - case EINTR: - break; /* get out of switch and retry */ - default: - abort(); - } - } -} +#include "qemu/futex.h" #else -static inline void futex_wake(QemuEvent *ev, int n) +static inline void qemu_futex_wake(QemuEvent *ev, int n) { pthread_mutex_lock(&ev->lock); if (n == 1) { @@ -326,7 +303,7 @@ static inline void futex_wake(QemuEvent *ev, int n) pthread_mutex_unlock(&ev->lock); } -static inline void futex_wait(QemuEvent *ev, unsigned val) +static inline void qemu_futex_wait(QemuEvent *ev, unsigned val) { pthread_mutex_lock(&ev->lock); if (ev->value == val) { @@ -338,7 +315,7 @@ static inline void futex_wait(QemuEvent *ev, unsigned val) /* Valid transitions: * - free->set, when setting the event - * - busy->set, when setting the event, followed by futex_wake + * - busy->set, when setting the event, followed by qemu_futex_wake * - set->free, when resetting the event * - free->busy, when waiting * @@ -381,7 +358,7 @@ void qemu_event_set(QemuEvent *ev) if (atomic_read(&ev->value) != EV_SET) { if (atomic_xchg(&ev->value, EV_SET) == EV_BUSY) { /* There were waiters, wake them up. */ - futex_wake(ev, INT_MAX); + qemu_futex_wake(ev, INT_MAX); } } } @@ -419,7 +396,7 @@ void qemu_event_wait(QemuEvent *ev) return; } } - futex_wait(ev, EV_BUSY); + qemu_futex_wait(ev, EV_BUSY); } } diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c index 728e76b5b296..178e0168a105 100644 --- a/util/qemu-thread-win32.c +++ b/util/qemu-thread-win32.c @@ -269,7 +269,7 @@ void qemu_sem_wait(QemuSemaphore *sem) * * Valid transitions: * - free->set, when setting the event - * - busy->set, when setting the event, followed by futex_wake + * - busy->set, when setting the event, followed by SetEvent * - set->free, when resetting the event * - free->busy, when waiting * diff --git a/util/trace-events b/util/trace-events index ed06aee2ec76..2b8aa3073928 100644 --- a/util/trace-events +++ b/util/trace-events @@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu" hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx" hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 + +# util/lockcnt.c +lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d" +lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded" +lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d" +lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded" +lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d" +lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d" +lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d" +lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"