Showing with 665 additions and 364 deletions.
  1. +1 −3 backends/tpm/tpm_backend.c
  2. +1 −1 block.c
  3. +2 −2 block/blkdebug.c
  4. +27 −18 block/block-backend.c
  5. +1 −1 block/export/export.c
  6. +17 −28 block/file-posix.c
  7. +1 −3 block/file-win32.c
  8. +1 −1 block/graph-lock.c
  9. +1 −1 block/io.c
  10. +15 −8 block/io_uring.c
  11. +17 −12 block/linux-aio.c
  12. +2 −2 block/mirror.c
  13. +6 −4 block/monitor/block-hmp-cmds.c
  14. +1 −1 block/qcow2-bitmap.c
  15. +13 −8 block/qcow2-cluster.c
  16. +4 −4 block/qcow2-refcount.c
  17. +13 −12 block/qcow2-snapshot.c
  18. +1 −2 block/qcow2-threads.c
  19. +14 −13 block/qcow2.c
  20. +8 −7 block/qcow2.h
  21. +1 −1 block/vmdk.c
  22. +30 −28 block/vvfat.c
  23. +2 −3 docs/devel/qapi-code-gen.rst
  24. +2 −2 hw/9pfs/9p.h
  25. +3 −3 hw/9pfs/codir.c
  26. +1 −2 hw/9pfs/coth.c
  27. +2 −4 hw/ppc/spapr_nvdimm.c
  28. +1 −2 hw/virtio/virtio-pmem.c
  29. +1 −1 include/block/aio-wait.h
  30. +0 −8 include/block/aio.h
  31. +1 −1 include/block/block_int-common.h
  32. +23 −10 include/block/raw-aio.h
  33. +9 −6 include/block/thread-pool.h
  34. +5 −0 include/sysemu/block-backend-io.h
  35. +1 −1 monitor/hmp.c
  36. +2 −2 monitor/monitor.c
  37. +24 −24 nbd/server.c
  38. +1 −0 scripts/qapi/commands.py
  39. +61 −52 scripts/qapi/expr.py
  40. +1 −0 scripts/qapi/gen.py
  41. +1 −1 scripts/qapi/main.py
  42. +25 −6 scripts/qapi/schema.py
  43. +2 −0 scripts/qapi/visit.py
  44. +1 −2 scsi/pr-manager.c
  45. +12 −13 scsi/qemu-pr-helper.c
  46. +2 −0 tests/qapi-schema/args-if-implicit.err
  47. +4 −0 tests/qapi-schema/args-if-implicit.json
  48. 0 tests/qapi-schema/args-if-implicit.out
  49. +2 −0 tests/qapi-schema/args-if-unboxed.err
  50. +6 −0 tests/qapi-schema/args-if-unboxed.json
  51. 0 tests/qapi-schema/args-if-unboxed.out
  52. +1 −1 tests/qapi-schema/bad-data.err
  53. +2 −0 tests/qapi-schema/event-args-if-unboxed.err
  54. +4 −0 tests/qapi-schema/event-args-if-unboxed.json
  55. 0 tests/qapi-schema/event-args-if-unboxed.out
  56. +1 −1 tests/qapi-schema/event-nest-struct.err
  57. +5 −0 tests/qapi-schema/meson.build
  58. +1 −1 tests/qapi-schema/nested-struct-data.err
  59. +42 −10 tests/qapi-schema/qapi-schema-test.json
  60. +40 −21 tests/qapi-schema/qapi-schema-test.out
  61. +1 −1 tests/qapi-schema/returns-dict.err
  62. +2 −0 tests/qapi-schema/struct-data-typename.err
  63. +2 −0 tests/qapi-schema/struct-data-typename.json
  64. 0 tests/qapi-schema/struct-data-typename.out
  65. +1 −1 tests/qapi-schema/struct-member-invalid.err
  66. +1 −0 tests/qapi-schema/test-qapi.py
  67. +1 −1 tests/qapi-schema/union-array-branch.err
  68. +1 −1 tests/qapi-schema/union-invalid-discriminator.err
  69. +2 −2 tests/qapi-schema/union-invalid-discriminator.json
  70. +2 −0 tests/qapi-schema/union-invalid-union-subfield.err
  71. +30 −0 tests/qapi-schema/union-invalid-union-subfield.json
  72. 0 tests/qapi-schema/union-invalid-union-subfield.out
  73. +2 −0 tests/qapi-schema/union-invalid-union-subtype.err
  74. +29 −0 tests/qapi-schema/union-invalid-union-subtype.json
  75. 0 tests/qapi-schema/union-invalid-union-subtype.out
  76. +47 −0 tests/unit/test-qobject-input-visitor.c
  77. +58 −0 tests/unit/test-qobject-output-visitor.c
  78. +6 −8 tests/unit/test-thread-pool.c
  79. +12 −13 util/thread-pool.c
4 changes: 1 addition & 3 deletions backends/tpm/tpm_backend.c
Expand Up @@ -100,16 +100,14 @@ bool tpm_backend_had_startup_error(TPMBackend *s)

void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
{
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());

if (s->cmd != NULL) {
error_report("There is a TPM request pending");
return;
}

s->cmd = cmd;
object_ref(OBJECT(s));
thread_pool_submit_aio(pool, tpm_backend_worker_thread, s,
thread_pool_submit_aio(tpm_backend_worker_thread, s,
tpm_backend_request_completed, s);
}

Expand Down
2 changes: 1 addition & 1 deletion block.c
Expand Up @@ -5750,7 +5750,7 @@ int bdrv_drop_intermediate(BlockDriverState *top, BlockDriverState *base,
* sums the size of all data-bearing children. (This excludes backing
* children.)
*/
static int64_t bdrv_sum_allocated_file_size(BlockDriverState *bs)
static int64_t coroutine_fn bdrv_sum_allocated_file_size(BlockDriverState *bs)
{
BdrvChild *child;
int64_t child_size, sum = 0;
Expand Down
4 changes: 2 additions & 2 deletions block/blkdebug.c
Expand Up @@ -583,8 +583,8 @@ static int blkdebug_open(BlockDriverState *bs, QDict *options, int flags,
return ret;
}

static int rule_check(BlockDriverState *bs, uint64_t offset, uint64_t bytes,
BlkdebugIOType iotype)
static int coroutine_fn rule_check(BlockDriverState *bs, uint64_t offset,
uint64_t bytes, BlkdebugIOType iotype)
{
BDRVBlkdebugState *s = bs->opaque;
BlkdebugRule *rule = NULL;
Expand Down
45 changes: 27 additions & 18 deletions block/block-backend.c
Expand Up @@ -80,9 +80,10 @@ struct BlockBackend {
NotifierList remove_bs_notifiers, insert_bs_notifiers;
QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers;

int quiesce_counter;
int quiesce_counter; /* atomic: written under BQL, read by other threads */
QemuMutex queued_requests_lock; /* protects queued_requests */
CoQueue queued_requests;
bool disable_request_queuing;
bool disable_request_queuing; /* atomic */

VMChangeStateEntry *vmsh;
bool force_allow_inactivate;
Expand Down Expand Up @@ -368,6 +369,7 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, uint64_t shared_perm)

block_acct_init(&blk->stats);

qemu_mutex_init(&blk->queued_requests_lock);
qemu_co_queue_init(&blk->queued_requests);
notifier_list_init(&blk->remove_bs_notifiers);
notifier_list_init(&blk->insert_bs_notifiers);
Expand Down Expand Up @@ -485,6 +487,8 @@ static void blk_delete(BlockBackend *blk)
assert(QLIST_EMPTY(&blk->remove_bs_notifiers.notifiers));
assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers));
assert(QLIST_EMPTY(&blk->aio_notifiers));
assert(qemu_co_queue_empty(&blk->queued_requests));
qemu_mutex_destroy(&blk->queued_requests_lock);
QTAILQ_REMOVE(&block_backends, blk, link);
drive_info_del(blk->legacy_dinfo);
block_acct_cleanup(&blk->stats);
Expand Down Expand Up @@ -1057,7 +1061,7 @@ void blk_set_dev_ops(BlockBackend *blk, const BlockDevOps *ops,
blk->dev_opaque = opaque;

/* Are we currently quiesced? Should we enforce this right now? */
if (blk->quiesce_counter && ops && ops->drained_begin) {
if (qatomic_read(&blk->quiesce_counter) && ops && ops->drained_begin) {
ops->drained_begin(opaque);
}
}
Expand Down Expand Up @@ -1232,7 +1236,7 @@ void blk_set_allow_aio_context_change(BlockBackend *blk, bool allow)
void blk_set_disable_request_queuing(BlockBackend *blk, bool disable)
{
IO_CODE();
blk->disable_request_queuing = disable;
qatomic_set(&blk->disable_request_queuing, disable);
}

static int coroutine_fn GRAPH_RDLOCK
Expand Down Expand Up @@ -1271,10 +1275,18 @@ static void coroutine_fn blk_wait_while_drained(BlockBackend *blk)
{
assert(blk->in_flight > 0);

if (blk->quiesce_counter && !blk->disable_request_queuing) {
if (qatomic_read(&blk->quiesce_counter) &&
!qatomic_read(&blk->disable_request_queuing)) {
/*
* Take lock before decrementing in flight counter so main loop thread
* waits for us to enqueue ourselves before it can leave the drained
* section.
*/
qemu_mutex_lock(&blk->queued_requests_lock);
blk_dec_in_flight(blk);
qemu_co_queue_wait(&blk->queued_requests, NULL);
qemu_co_queue_wait(&blk->queued_requests, &blk->queued_requests_lock);
blk_inc_in_flight(blk);
qemu_mutex_unlock(&blk->queued_requests_lock);
}
}

Expand Down Expand Up @@ -1862,14 +1874,8 @@ void blk_drain_all(void)
bdrv_drain_all_begin();

while ((blk = blk_all_next(blk)) != NULL) {
AioContext *ctx = blk_get_aio_context(blk);

aio_context_acquire(ctx);

/* We may have -ENOMEDIUM completions in flight */
AIO_WAIT_WHILE(ctx, qatomic_read(&blk->in_flight) > 0);

aio_context_release(ctx);
AIO_WAIT_WHILE_UNLOCKED(NULL, qatomic_read(&blk->in_flight) > 0);
}

bdrv_drain_all_end();
Expand Down Expand Up @@ -2595,7 +2601,7 @@ static void blk_root_drained_begin(BdrvChild *child)
BlockBackend *blk = child->opaque;
ThrottleGroupMember *tgm = &blk->public.throttle_group_member;

if (++blk->quiesce_counter == 1) {
if (qatomic_fetch_inc(&blk->quiesce_counter) == 0) {
if (blk->dev_ops && blk->dev_ops->drained_begin) {
blk->dev_ops->drained_begin(blk->dev_opaque);
}
Expand All @@ -2613,7 +2619,7 @@ static bool blk_root_drained_poll(BdrvChild *child)
{
BlockBackend *blk = child->opaque;
bool busy = false;
assert(blk->quiesce_counter);
assert(qatomic_read(&blk->quiesce_counter));

if (blk->dev_ops && blk->dev_ops->drained_poll) {
busy = blk->dev_ops->drained_poll(blk->dev_opaque);
Expand All @@ -2624,18 +2630,21 @@ static bool blk_root_drained_poll(BdrvChild *child)
static void blk_root_drained_end(BdrvChild *child)
{
BlockBackend *blk = child->opaque;
assert(blk->quiesce_counter);
assert(qatomic_read(&blk->quiesce_counter));

assert(blk->public.throttle_group_member.io_limits_disabled);
qatomic_dec(&blk->public.throttle_group_member.io_limits_disabled);

if (--blk->quiesce_counter == 0) {
if (qatomic_fetch_dec(&blk->quiesce_counter) == 1) {
if (blk->dev_ops && blk->dev_ops->drained_end) {
blk->dev_ops->drained_end(blk->dev_opaque);
}
while (qemu_co_enter_next(&blk->queued_requests, NULL)) {
qemu_mutex_lock(&blk->queued_requests_lock);
while (qemu_co_enter_next(&blk->queued_requests,
&blk->queued_requests_lock)) {
/* Resume all queued requests */
}
qemu_mutex_unlock(&blk->queued_requests_lock);
}
}

Expand Down
2 changes: 1 addition & 1 deletion block/export/export.c
Expand Up @@ -306,7 +306,7 @@ void blk_exp_close_all_type(BlockExportType type)
blk_exp_request_shutdown(exp);
}

AIO_WAIT_WHILE(NULL, blk_exp_has_type(type));
AIO_WAIT_WHILE_UNLOCKED(NULL, blk_exp_has_type(type));
}

void blk_exp_close_all(void)
Expand Down
45 changes: 17 additions & 28 deletions block/file-posix.c
Expand Up @@ -2040,12 +2040,9 @@ static int handle_aiocb_truncate(void *opaque)
return result;
}

static int coroutine_fn raw_thread_pool_submit(BlockDriverState *bs,
ThreadPoolFunc func, void *arg)
static int coroutine_fn raw_thread_pool_submit(ThreadPoolFunc func, void *arg)
{
/* @bs can be NULL, bdrv_get_aio_context() returns the main context then */
ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
return thread_pool_submit_co(pool, func, arg);
return thread_pool_submit_co(func, arg);
}

/*
Expand Down Expand Up @@ -2089,16 +2086,13 @@ static int coroutine_fn raw_co_prw(BlockDriverState *bs, uint64_t offset,
type |= QEMU_AIO_MISALIGNED;
#ifdef CONFIG_LINUX_IO_URING
} else if (s->use_linux_io_uring) {
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
assert(qiov->size == bytes);
return luring_co_submit(bs, aio, s->fd, offset, qiov, type);
return luring_co_submit(bs, s->fd, offset, qiov, type);
#endif
#ifdef CONFIG_LINUX_AIO
} else if (s->use_linux_aio) {
LinuxAioState *aio = aio_get_linux_aio(bdrv_get_aio_context(bs));
assert(qiov->size == bytes);
return laio_co_submit(bs, aio, s->fd, offset, qiov, type,
s->aio_max_batch);
return laio_co_submit(s->fd, offset, qiov, type, s->aio_max_batch);
#endif
}

Expand All @@ -2115,7 +2109,7 @@ static int coroutine_fn raw_co_prw(BlockDriverState *bs, uint64_t offset,
};

assert(qiov->size == bytes);
return raw_thread_pool_submit(bs, handle_aiocb_rw, &acb);
return raw_thread_pool_submit(handle_aiocb_rw, &acb);
}

static int coroutine_fn raw_co_preadv(BlockDriverState *bs, int64_t offset,
Expand All @@ -2137,14 +2131,12 @@ static void coroutine_fn raw_co_io_plug(BlockDriverState *bs)
BDRVRawState __attribute__((unused)) *s = bs->opaque;
#ifdef CONFIG_LINUX_AIO
if (s->use_linux_aio) {
LinuxAioState *aio = aio_get_linux_aio(bdrv_get_aio_context(bs));
laio_io_plug(bs, aio);
laio_io_plug();
}
#endif
#ifdef CONFIG_LINUX_IO_URING
if (s->use_linux_io_uring) {
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
luring_io_plug(bs, aio);
luring_io_plug();
}
#endif
}
Expand All @@ -2154,14 +2146,12 @@ static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs)
BDRVRawState __attribute__((unused)) *s = bs->opaque;
#ifdef CONFIG_LINUX_AIO
if (s->use_linux_aio) {
LinuxAioState *aio = aio_get_linux_aio(bdrv_get_aio_context(bs));
laio_io_unplug(bs, aio, s->aio_max_batch);
laio_io_unplug(s->aio_max_batch);
}
#endif
#ifdef CONFIG_LINUX_IO_URING
if (s->use_linux_io_uring) {
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
luring_io_unplug(bs, aio);
luring_io_unplug();
}
#endif
}
Expand All @@ -2185,11 +2175,10 @@ static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs)

#ifdef CONFIG_LINUX_IO_URING
if (s->use_linux_io_uring) {
LuringState *aio = aio_get_linux_io_uring(bdrv_get_aio_context(bs));
return luring_co_submit(bs, aio, s->fd, 0, NULL, QEMU_AIO_FLUSH);
return luring_co_submit(bs, s->fd, 0, NULL, QEMU_AIO_FLUSH);
}
#endif
return raw_thread_pool_submit(bs, handle_aiocb_flush, &acb);
return raw_thread_pool_submit(handle_aiocb_flush, &acb);
}

static void raw_aio_attach_aio_context(BlockDriverState *bs,
Expand Down Expand Up @@ -2251,7 +2240,7 @@ raw_regular_truncate(BlockDriverState *bs, int fd, int64_t offset,
},
};

return raw_thread_pool_submit(bs, handle_aiocb_truncate, &acb);
return raw_thread_pool_submit(handle_aiocb_truncate, &acb);
}

static int coroutine_fn raw_co_truncate(BlockDriverState *bs, int64_t offset,
Expand Down Expand Up @@ -3000,7 +2989,7 @@ raw_do_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes,
acb.aio_type |= QEMU_AIO_BLKDEV;
}

ret = raw_thread_pool_submit(bs, handle_aiocb_discard, &acb);
ret = raw_thread_pool_submit(handle_aiocb_discard, &acb);
raw_account_discard(s, bytes, ret);
return ret;
}
Expand Down Expand Up @@ -3075,7 +3064,7 @@ raw_do_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
handler = handle_aiocb_write_zeroes;
}

return raw_thread_pool_submit(bs, handler, &acb);
return raw_thread_pool_submit(handler, &acb);
}

static int coroutine_fn raw_co_pwrite_zeroes(
Expand Down Expand Up @@ -3313,7 +3302,7 @@ raw_co_copy_range_to(BlockDriverState *bs,
},
};

return raw_thread_pool_submit(bs, handle_aiocb_copy_range, &acb);
return raw_thread_pool_submit(handle_aiocb_copy_range, &acb);
}

BlockDriver bdrv_file = {
Expand Down Expand Up @@ -3643,7 +3632,7 @@ hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
struct sg_io_hdr *io_hdr = buf;
if (io_hdr->cmdp[0] == PERSISTENT_RESERVE_OUT ||
io_hdr->cmdp[0] == PERSISTENT_RESERVE_IN) {
return pr_manager_execute(s->pr_mgr, bdrv_get_aio_context(bs),
return pr_manager_execute(s->pr_mgr, qemu_get_current_aio_context(),
s->fd, io_hdr);
}
}
Expand All @@ -3659,7 +3648,7 @@ hdev_co_ioctl(BlockDriverState *bs, unsigned long int req, void *buf)
},
};

return raw_thread_pool_submit(bs, handle_aiocb_ioctl, &acb);
return raw_thread_pool_submit(handle_aiocb_ioctl, &acb);
}
#endif /* linux */

Expand Down
4 changes: 1 addition & 3 deletions block/file-win32.c
Expand Up @@ -153,7 +153,6 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
BlockCompletionFunc *cb, void *opaque, int type)
{
RawWin32AIOData *acb = g_new(RawWin32AIOData, 1);
ThreadPool *pool;

acb->bs = bs;
acb->hfile = hfile;
Expand All @@ -168,8 +167,7 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
acb->aio_offset = offset;

trace_file_paio_submit(acb, opaque, offset, count, type);
pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
}

int qemu_ftruncate64(int fd, int64_t length)
Expand Down
2 changes: 1 addition & 1 deletion block/graph-lock.c
Expand Up @@ -127,7 +127,7 @@ void bdrv_graph_wrlock(void)
* reader lock.
*/
qatomic_set(&has_writer, 0);
AIO_WAIT_WHILE(qemu_get_aio_context(), reader_count() >= 1);
AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1);
qatomic_set(&has_writer, 1);

/*
Expand Down
2 changes: 1 addition & 1 deletion block/io.c
Expand Up @@ -524,7 +524,7 @@ void bdrv_drain_all_begin(void)
bdrv_drain_all_begin_nopoll();

/* Now poll the in-flight requests */
AIO_WAIT_WHILE(NULL, bdrv_drain_all_poll());
AIO_WAIT_WHILE_UNLOCKED(NULL, bdrv_drain_all_poll());

while ((bs = bdrv_next_all_states(bs))) {
bdrv_drain_assert_idle(bs);
Expand Down