Skip to content

Commit

Permalink
io: follow coroutine AioContext in qio_channel_yield()
Browse files Browse the repository at this point in the history
The ongoing QEMU multi-queue block layer effort makes it possible for multiple
threads to process I/O in parallel. The nbd block driver is not compatible with
the multi-queue block layer yet because QIOChannel cannot be used easily from
coroutines running in multiple threads. This series changes the QIOChannel API
to make that possible.

In the current API, calling qio_channel_attach_aio_context() sets the
AioContext where qio_channel_yield() installs an fd handler prior to yielding:

  qio_channel_attach_aio_context(ioc, my_ctx);
  ...
  qio_channel_yield(ioc); // my_ctx is used here
  ...
  qio_channel_detach_aio_context(ioc);

This API design has limitations: reading and writing must be done in the same
AioContext and moving between AioContexts involves a cumbersome sequence of API
calls that is not suitable for doing on a per-request basis.

There is no fundamental reason why a QIOChannel needs to run within the
same AioContext every time qio_channel_yield() is called. QIOChannel
only uses the AioContext while inside qio_channel_yield(). The rest of
the time, QIOChannel is independent of any AioContext.

In the new API, qio_channel_yield() queries the AioContext from the current
coroutine using qemu_coroutine_get_aio_context(). There is no need to
explicitly attach/detach AioContexts anymore and
qio_channel_attach_aio_context() and qio_channel_detach_aio_context() are gone.
One coroutine can read from the QIOChannel while another coroutine writes from
a different AioContext.

This API change allows the nbd block driver to use QIOChannel from any thread.
It's important to keep in mind that the block driver already synchronizes
QIOChannel access and ensures that two coroutines never read simultaneously or
write simultaneously.

This patch updates all users of qio_channel_attach_aio_context() to the
new API. Most conversions are simple, but vhost-user-server requires a
new qemu_coroutine_yield() call to quiesce the vu_client_trip()
coroutine when not attached to any AioContext.

While the API is has become simpler, there is one wart: QIOChannel has a
special case for the iohandler AioContext (used for handlers that must not run
in nested event loops). I didn't find an elegant way preserve that behavior, so
I added a new API called qio_channel_set_follow_coroutine_ctx(ioc, true|false)
for opting in to the new AioContext model. By default QIOChannel uses the
iohandler AioHandler. Code that formerly called
qio_channel_attach_aio_context() now calls
qio_channel_set_follow_coroutine_ctx(ioc, true) once after the QIOChannel is
created.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Acked-by: Daniel P. Berrangé <berrange@redhat.com>
Message-ID: <20230830224802.493686-5-stefanha@redhat.com>
[eblake: also fix migration/rdma.c]
Signed-off-by: Eric Blake <eblake@redhat.com>
  • Loading branch information
stefanhaRH authored and ebblake committed Sep 8, 2023
1 parent acd4be6 commit 06e0f09
Show file tree
Hide file tree
Showing 16 changed files with 226 additions and 126 deletions.
11 changes: 1 addition & 10 deletions block/nbd.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
}

qio_channel_set_blocking(s->ioc, false, NULL);
qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
qio_channel_set_follow_coroutine_ctx(s->ioc, true);

/* successfully connected */
WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
Expand Down Expand Up @@ -397,7 +397,6 @@ static void coroutine_fn GRAPH_RDLOCK nbd_reconnect_attempt(BDRVNBDState *s)

/* Finalize previous connection if any */
if (s->ioc) {
qio_channel_detach_aio_context(s->ioc);
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs);
object_unref(OBJECT(s->ioc));
Expand Down Expand Up @@ -2089,10 +2088,6 @@ static void nbd_attach_aio_context(BlockDriverState *bs,
* the reconnect_delay_timer cannot be active here.
*/
assert(!s->reconnect_delay_timer);

if (s->ioc) {
qio_channel_attach_aio_context(s->ioc, new_context);
}
}

static void nbd_detach_aio_context(BlockDriverState *bs)
Expand All @@ -2101,10 +2096,6 @@ static void nbd_detach_aio_context(BlockDriverState *bs)

assert(!s->open_timer);
assert(!s->reconnect_delay_timer);

if (s->ioc) {
qio_channel_detach_aio_context(s->ioc);
}
}

static BlockDriver bdrv_nbd = {
Expand Down
23 changes: 23 additions & 0 deletions include/io/channel-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,27 @@
QIOChannel *qio_channel_new_fd(int fd,
Error **errp);

/**
* qio_channel_util_set_aio_fd_handler:
* @read_fd: the file descriptor for the read handler
* @read_ctx: the AioContext for the read handler
* @io_read: the read handler
* @write_fd: the file descriptor for the write handler
* @write_ctx: the AioContext for the write handler
* @io_write: the write handler
* @opaque: the opaque argument to the read and write handler
*
* Set the read and write handlers when @read_ctx and @write_ctx are non-NULL,
* respectively. To leave a handler in its current state, pass a NULL
* AioContext. To clear a handler, pass a non-NULL AioContext and a NULL
* handler.
*/
void qio_channel_util_set_aio_fd_handler(int read_fd,
AioContext *read_ctx,
IOHandler *io_read,
int write_fd,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque);

#endif /* QIO_CHANNEL_UTIL_H */
69 changes: 30 additions & 39 deletions include/io/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ struct QIOChannel {
Object parent;
unsigned int features; /* bitmask of QIOChannelFeatures */
char *name;
AioContext *ctx;
AioContext *read_ctx;
Coroutine *read_coroutine;
AioContext *write_ctx;
Coroutine *write_coroutine;
bool follow_coroutine_ctx;
#ifdef _WIN32
HANDLE event; /* For use with GSource on Win32 */
#endif
Expand Down Expand Up @@ -140,8 +142,9 @@ struct QIOChannelClass {
int whence,
Error **errp);
void (*io_set_aio_fd_handler)(QIOChannel *ioc,
AioContext *ctx,
AioContext *read_ctx,
IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque);
int (*io_flush)(QIOChannel *ioc,
Expand Down Expand Up @@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc,
bool enabled,
Error **errp);

/**
* qio_channel_set_follow_coroutine_ctx:
* @ioc: the channel object
* @enabled: whether or not to follow the coroutine's AioContext
*
* If @enabled is true, calls to qio_channel_yield() use the current
* coroutine's AioContext. Usually this is desirable.
*
* If @enabled is false, calls to qio_channel_yield() use the global iohandler
* AioContext. This is may be used by coroutines that run in the main loop and
* do not wish to respond to I/O during nested event loops. This is the
* default for compatibility with code that is not aware of AioContexts.
*/
void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled);

/**
* qio_channel_close:
* @ioc: the channel object
Expand Down Expand Up @@ -703,41 +721,6 @@ GSource *qio_channel_add_watch_source(QIOChannel *ioc,
GDestroyNotify notify,
GMainContext *context);

/**
* qio_channel_attach_aio_context:
* @ioc: the channel object
* @ctx: the #AioContext to set the handlers on
*
* Request that qio_channel_yield() sets I/O handlers on
* the given #AioContext. If @ctx is %NULL, qio_channel_yield()
* uses QEMU's main thread event loop.
*
* You can move a #QIOChannel from one #AioContext to another even if
* I/O handlers are set for a coroutine. However, #QIOChannel provides
* no synchronization between the calls to qio_channel_yield() and
* qio_channel_attach_aio_context().
*
* Therefore you should first call qio_channel_detach_aio_context()
* to ensure that the coroutine is not entered concurrently. Then,
* while the coroutine has yielded, call qio_channel_attach_aio_context(),
* and then aio_co_schedule() to place the coroutine on the new
* #AioContext. The calls to qio_channel_detach_aio_context()
* and qio_channel_attach_aio_context() should be protected with
* aio_context_acquire() and aio_context_release().
*/
void qio_channel_attach_aio_context(QIOChannel *ioc,
AioContext *ctx);

/**
* qio_channel_detach_aio_context:
* @ioc: the channel object
*
* Disable any I/O handlers set by qio_channel_yield(). With the
* help of aio_co_schedule(), this allows moving a coroutine that was
* paused by qio_channel_yield() to another context.
*/
void qio_channel_detach_aio_context(QIOChannel *ioc);

/**
* qio_channel_yield:
* @ioc: the channel object
Expand Down Expand Up @@ -785,19 +768,27 @@ void qio_channel_wait(QIOChannel *ioc,
/**
* qio_channel_set_aio_fd_handler:
* @ioc: the channel object
* @ctx: the AioContext to set the handlers on
* @read_ctx: the AioContext to set the read handler on or NULL
* @io_read: the read handler
* @write_ctx: the AioContext to set the write handler on or NULL
* @io_write: the write handler
* @opaque: the opaque value passed to the handler
*
* This is used internally by qio_channel_yield(). It can
* be used by channel implementations to forward the handlers
* to another channel (e.g. from #QIOChannelTLS to the
* underlying socket).
*
* When @read_ctx is NULL, don't touch the read handler. When @write_ctx is
* NULL, don't touch the write handler. Note that setting the read handler
* clears the write handler, and vice versa, if they share the same AioContext.
* Therefore the caller must pass both handlers together when sharing the same
* AioContext.
*/
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
AioContext *read_ctx,
IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque);

Expand Down
1 change: 1 addition & 0 deletions include/qemu/vhost-user-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct {
unsigned int in_flight; /* atomic */

/* Protected by ctx lock */
bool in_qio_channel_yield;
bool wait_idle;
VuDev vu_dev;
QIOChannel *ioc; /* The I/O channel with the client */
Expand Down
10 changes: 7 additions & 3 deletions io/channel-command.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "qemu/osdep.h"
#include "io/channel-command.h"
#include "io/channel-util.h"
#include "io/channel-watch.h"
#include "qapi/error.h"
#include "qemu/module.h"
Expand Down Expand Up @@ -331,14 +332,17 @@ static int qio_channel_command_close(QIOChannel *ioc,


static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
AioContext *read_ctx,
IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);

qio_channel_util_set_aio_fd_handler(cioc->readfd, read_ctx, io_read,
cioc->writefd, write_ctx, io_write,
opaque);
}


Expand Down
9 changes: 7 additions & 2 deletions io/channel-file.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "qemu/osdep.h"
#include "io/channel-file.h"
#include "io/channel-util.h"
#include "io/channel-watch.h"
#include "qapi/error.h"
#include "qemu/module.h"
Expand Down Expand Up @@ -192,13 +193,17 @@ static int qio_channel_file_close(QIOChannel *ioc,


static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
AioContext *read_ctx,
IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);

qio_channel_util_set_aio_fd_handler(fioc->fd, read_ctx, io_read,
fioc->fd, write_ctx, io_write,
opaque);
}

static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
Expand Down
3 changes: 2 additions & 1 deletion io/channel-null.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,

static void
qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
AioContext *ctx G_GNUC_UNUSED,
AioContext *read_ctx G_GNUC_UNUSED,
IOHandler *io_read G_GNUC_UNUSED,
AioContext *write_ctx G_GNUC_UNUSED,
IOHandler *io_write G_GNUC_UNUSED,
void *opaque G_GNUC_UNUSED)
{
Expand Down
9 changes: 7 additions & 2 deletions io/channel-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "qapi/qapi-visit-sockets.h"
#include "qemu/module.h"
#include "io/channel-socket.h"
#include "io/channel-util.h"
#include "io/channel-watch.h"
#include "trace.h"
#include "qapi/clone-visitor.h"
Expand Down Expand Up @@ -893,13 +894,17 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
}

static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
AioContext *read_ctx,
IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);

qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
sioc->fd, write_ctx, io_write,
opaque);
}

static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
Expand Down
6 changes: 4 additions & 2 deletions io/channel-tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,16 @@ static int qio_channel_tls_close(QIOChannel *ioc,
}

static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
AioContext *read_ctx,
IOHandler *io_read,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);

qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
qio_channel_set_aio_fd_handler(tioc->master, read_ctx, io_read,
write_ctx, io_write, opaque);
}

typedef struct QIOChannelTLSSource QIOChannelTLSSource;
Expand Down
24 changes: 24 additions & 0 deletions io/channel-util.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,27 @@ QIOChannel *qio_channel_new_fd(int fd,
}
return ioc;
}


void qio_channel_util_set_aio_fd_handler(int read_fd,
AioContext *read_ctx,
IOHandler *io_read,
int write_fd,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
if (read_fd == write_fd && read_ctx == write_ctx) {
aio_set_fd_handler(read_ctx, read_fd, io_read, io_write,
NULL, NULL, opaque);
} else {
if (read_ctx) {
aio_set_fd_handler(read_ctx, read_fd, io_read, NULL,
NULL, NULL, opaque);
}
if (write_ctx) {
aio_set_fd_handler(write_ctx, write_fd, NULL, io_write,
NULL, NULL, opaque);
}
}
}

0 comments on commit 06e0f09

Please sign in to comment.