Skip to content

Commit

Permalink
linux-aio: avoid deadlock in nested aio_poll() calls
Browse files Browse the repository at this point in the history
If two Linux AIO request completions are fetched in the same
io_getevents() call, QEMU will deadlock if request A's callback waits
for request B to complete using an aio_poll() loop.  This was reported
to happen with the mirror blockjob.

This patch moves completion processing into a BH and makes it resumable.
Nested event loops can resume completion processing so that request B
will complete and the deadlock will not occur.

Cc: Kevin Wolf <kwolf@redhat.com>
Cc: Paolo Bonzini <pbonzini@redhat.com>
Cc: Ming Lei <ming.lei@canonical.com>
Cc: Marcin Gibuła <m.gibula@beyond.pl>
Reported-by: Marcin Gibuła <m.gibula@beyond.pl>
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Tested-by: Marcin Gibuła <m.gibula@beyond.pl>
  • Loading branch information
stefanhaRH committed Aug 29, 2014
1 parent 12ade76 commit 2cdff7f
Showing 1 changed file with 55 additions and 16 deletions.
71 changes: 55 additions & 16 deletions block/linux-aio.c
Expand Up @@ -51,6 +51,12 @@ struct qemu_laio_state {

/* io queue for submit at batch */
LaioQueue io_q;

/* I/O completion processing */
QEMUBH *completion_bh;
struct io_event events[MAX_EVENTS];
int event_idx;
int event_max;
};

static inline ssize_t io_event_ret(struct io_event *ev)
Expand Down Expand Up @@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
qemu_aio_release(laiocb);
}

static void qemu_laio_completion_cb(EventNotifier *e)
/* The completion BH fetches completed I/O requests and invokes their
* callbacks.
*
* The function is somewhat tricky because it supports nested event loops, for
* example when a request callback invokes aio_poll(). In order to do this,
* the completion events array and index are kept in qemu_laio_state. The BH
* reschedules itself as long as there are completions pending so it will
* either be called again in a nested event loop or will be called after all
* events have been completed. When there are no events left to complete, the
* BH returns without rescheduling.
*/
static void qemu_laio_completion_bh(void *opaque)
{
struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);

while (event_notifier_test_and_clear(&s->e)) {
struct io_event events[MAX_EVENTS];
struct timespec ts = { 0 };
int nevents, i;
struct qemu_laio_state *s = opaque;

/* Fetch more completion events when empty */
if (s->event_idx == s->event_max) {
do {
nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts);
} while (nevents == -EINTR);
struct timespec ts = { 0 };
s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
s->events, &ts);
} while (s->event_max == -EINTR);

s->event_idx = 0;
if (s->event_max <= 0) {
s->event_max = 0;
return; /* no more events */
}
}

for (i = 0; i < nevents; i++) {
struct iocb *iocb = events[i].obj;
struct qemu_laiocb *laiocb =
container_of(iocb, struct qemu_laiocb, iocb);
/* Reschedule so nested event loops see currently pending completions */
qemu_bh_schedule(s->completion_bh);

laiocb->ret = io_event_ret(&events[i]);
qemu_laio_process_completion(s, laiocb);
}
/* Process completion events */
while (s->event_idx < s->event_max) {
struct iocb *iocb = s->events[s->event_idx].obj;
struct qemu_laiocb *laiocb =
container_of(iocb, struct qemu_laiocb, iocb);

laiocb->ret = io_event_ret(&s->events[s->event_idx]);
s->event_idx++;

qemu_laio_process_completion(s, laiocb);
}
}

static void qemu_laio_completion_cb(EventNotifier *e)
{
struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);

if (event_notifier_test_and_clear(&s->e)) {
qemu_bh_schedule(s->completion_bh);
}
}

Expand Down Expand Up @@ -272,12 +309,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
struct qemu_laio_state *s = s_;

aio_set_event_notifier(old_context, &s->e, NULL);
qemu_bh_delete(s->completion_bh);
}

void laio_attach_aio_context(void *s_, AioContext *new_context)
{
struct qemu_laio_state *s = s_;

s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
}

Expand Down

0 comments on commit 2cdff7f

Please sign in to comment.