Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-reques…
Browse files Browse the repository at this point in the history
…t' into staging

Pull request

v2:
 * Rebased to resolve scsi conflicts

# gpg: Signature made Tue 21 Feb 2017 11:56:24 GMT
# gpg:                using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request: (24 commits)
  coroutine-lock: make CoRwlock thread-safe and fair
  coroutine-lock: add mutex argument to CoQueue APIs
  coroutine-lock: place CoMutex before CoQueue in header
  test-aio-multithread: add performance comparison with thread-based mutexes
  coroutine-lock: add limited spinning to CoMutex
  coroutine-lock: make CoMutex thread-safe
  block: document fields protected by AioContext lock
  async: remove unnecessary inc/dec pairs
  aio-posix: partially inline aio_dispatch into aio_poll
  block: explicitly acquire aiocontext in aio callbacks that need it
  block: explicitly acquire aiocontext in bottom halves that need it
  block: explicitly acquire aiocontext in callbacks that need it
  block: explicitly acquire aiocontext in timers that need it
  aio: push aio_context_acquire/release down to dispatching
  qed: introduce qed_aio_start_io and qed_aio_next_io_cb
  blkdebug: reschedule coroutine on the AioContext it is running on
  coroutine-lock: reschedule coroutine on the AioContext it was running on
  nbd: convert to use qio_channel_yield
  io: make qio_channel_yield aware of AioContexts
  io: add methods to set I/O handlers on AioContext
  ...

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
  • Loading branch information
pm215 committed Feb 21, 2017
2 parents b856256 + a7b91d3 commit a0775e2
Show file tree
Hide file tree
Showing 67 changed files with 1,711 additions and 532 deletions.
4 changes: 0 additions & 4 deletions Makefile.objs
Expand Up @@ -9,12 +9,8 @@ chardev-obj-y = chardev/
#######################################################################
# block-obj-y is code used by both qemu system emulation and qemu-img

block-obj-y = async.o thread-pool.o
block-obj-y += nbd/
block-obj-y += block.o blockjob.o
block-obj-y += main-loop.o iohandler.o qemu-timer.o
block-obj-$(CONFIG_POSIX) += aio-posix.o
block-obj-$(CONFIG_WIN32) += aio-win32.o
block-obj-y += block/
block-obj-y += qemu-io-cmds.o
block-obj-$(CONFIG_REPLICATION) += replication.o
Expand Down
2 changes: 1 addition & 1 deletion block/backup.c
Expand Up @@ -64,7 +64,7 @@ static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job,
retry = false;
QLIST_FOREACH(req, &job->inflight_reqs, list) {
if (end > req->start && start < req->end) {
qemu_co_queue_wait(&req->wait_queue);
qemu_co_queue_wait(&req->wait_queue, NULL);
retry = true;
break;
}
Expand Down
9 changes: 1 addition & 8 deletions block/blkdebug.c
Expand Up @@ -405,12 +405,6 @@ static int blkdebug_open(BlockDriverState *bs, QDict *options, int flags,
return ret;
}

static void error_callback_bh(void *opaque)
{
Coroutine *co = opaque;
qemu_coroutine_enter(co);
}

static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
{
BDRVBlkdebugState *s = bs->opaque;
Expand All @@ -423,8 +417,7 @@ static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
}

if (!immediately) {
aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), error_callback_bh,
qemu_coroutine_self());
aio_co_schedule(qemu_get_current_aio_context(), qemu_coroutine_self());
qemu_coroutine_yield();
}

Expand Down
2 changes: 1 addition & 1 deletion block/blkreplay.c
Expand Up @@ -60,7 +60,7 @@ static int64_t blkreplay_getlength(BlockDriverState *bs)
static void blkreplay_bh_cb(void *opaque)
{
Request *req = opaque;
qemu_coroutine_enter(req->co);
aio_co_wake(req->co);
qemu_bh_delete(req->bh);
g_free(req);
}
Expand Down
13 changes: 8 additions & 5 deletions block/block-backend.c
Expand Up @@ -880,7 +880,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
{
QEMUIOVector qiov;
struct iovec iov;
Coroutine *co;
BlkRwCo rwco;

iov = (struct iovec) {
Expand All @@ -897,9 +896,14 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
.ret = NOT_DONE,
};

co = qemu_coroutine_create(co_entry, &rwco);
qemu_coroutine_enter(co);
BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
if (qemu_in_coroutine()) {
/* Fast-path if already in coroutine context */
co_entry(&rwco);
} else {
Coroutine *co = qemu_coroutine_create(co_entry, &rwco);
qemu_coroutine_enter(co);
BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
}

return rwco.ret;
}
Expand Down Expand Up @@ -979,7 +983,6 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb)
static void blk_aio_complete_bh(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;

assert(acb->has_returned);
blk_aio_complete(acb);
}
Expand Down
44 changes: 33 additions & 11 deletions block/curl.c
Expand Up @@ -386,9 +386,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
}
}

static void curl_multi_do(void *arg)
static void curl_multi_do_locked(CURLState *s)
{
CURLState *s = (CURLState *)arg;
CURLSocket *socket, *next_socket;
int running;
int r;
Expand All @@ -406,12 +405,23 @@ static void curl_multi_do(void *arg)
}
}

static void curl_multi_do(void *arg)
{
CURLState *s = (CURLState *)arg;

aio_context_acquire(s->s->aio_context);
curl_multi_do_locked(s);
aio_context_release(s->s->aio_context);
}

static void curl_multi_read(void *arg)
{
CURLState *s = (CURLState *)arg;

curl_multi_do(arg);
aio_context_acquire(s->s->aio_context);
curl_multi_do_locked(s);
curl_multi_check_completion(s->s);
aio_context_release(s->s->aio_context);
}

static void curl_multi_timeout_do(void *arg)
Expand All @@ -424,9 +434,11 @@ static void curl_multi_timeout_do(void *arg)
return;
}

aio_context_acquire(s->aio_context);
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);

curl_multi_check_completion(s);
aio_context_release(s->aio_context);
#else
abort();
#endif
Expand Down Expand Up @@ -784,31 +796,35 @@ static void curl_readv_bh_cb(void *p)
{
CURLState *state;
int running;
int ret = -EINPROGRESS;

CURLAIOCB *acb = p;
BDRVCURLState *s = acb->common.bs->opaque;
BlockDriverState *bs = acb->common.bs;
BDRVCURLState *s = bs->opaque;
AioContext *ctx = bdrv_get_aio_context(bs);

size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
size_t end;

aio_context_acquire(ctx);

// In case we have the requested data already (e.g. read-ahead),
// we can just call the callback and be done.
switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
case FIND_RET_OK:
qemu_aio_unref(acb);
// fall through
case FIND_RET_WAIT:
return;
goto out;
default:
break;
}

// No cache found, so let's start a new request
state = curl_init_state(acb->common.bs, s);
if (!state) {
acb->common.cb(acb->common.opaque, -EIO);
qemu_aio_unref(acb);
return;
ret = -EIO;
goto out;
}

acb->start = 0;
Expand All @@ -822,9 +838,8 @@ static void curl_readv_bh_cb(void *p)
state->orig_buf = g_try_malloc(state->buf_len);
if (state->buf_len && state->orig_buf == NULL) {
curl_clean_state(state);
acb->common.cb(acb->common.opaque, -ENOMEM);
qemu_aio_unref(acb);
return;
ret = -ENOMEM;
goto out;
}
state->acb[0] = acb;

Expand All @@ -837,6 +852,13 @@ static void curl_readv_bh_cb(void *p)

/* Tell curl it needs to kick things off */
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);

out:
aio_context_release(ctx);
if (ret != -EINPROGRESS) {
acb->common.cb(acb->common.opaque, ret);
qemu_aio_unref(acb);
}
}

static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
Expand Down
9 changes: 1 addition & 8 deletions block/gluster.c
Expand Up @@ -698,13 +698,6 @@ static struct glfs *qemu_gluster_init(BlockdevOptionsGluster *gconf,
return qemu_gluster_glfs_init(gconf, errp);
}

static void qemu_gluster_complete_aio(void *opaque)
{
GlusterAIOCB *acb = (GlusterAIOCB *)opaque;

qemu_coroutine_enter(acb->coroutine);
}

/*
* AIO callback routine called from GlusterFS thread.
*/
Expand All @@ -720,7 +713,7 @@ static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
acb->ret = -EIO; /* Partial read/write - fail it */
}

aio_bh_schedule_oneshot(acb->aio_context, qemu_gluster_complete_aio, acb);
aio_co_schedule(acb->aio_context, acb->coroutine);
}

static void qemu_gluster_parse_flags(int bdrv_flags, int *open_flags)
Expand Down
42 changes: 9 additions & 33 deletions block/io.c
Expand Up @@ -189,7 +189,7 @@ static void bdrv_co_drain_bh_cb(void *opaque)
bdrv_dec_in_flight(bs);
bdrv_drained_begin(bs);
data->done = true;
qemu_coroutine_enter(co);
aio_co_wake(co);
}

static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
Expand Down Expand Up @@ -539,7 +539,7 @@ static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self)
* (instead of producing a deadlock in the former case). */
if (!req->waiting_for) {
self->waiting_for = req;
qemu_co_queue_wait(&req->wait_queue);
qemu_co_queue_wait(&req->wait_queue, NULL);
self->waiting_for = NULL;
retry = true;
waited = true;
Expand Down Expand Up @@ -813,7 +813,7 @@ static void bdrv_co_io_em_complete(void *opaque, int ret)
CoroutineIOCompletion *co = opaque;

co->ret = ret;
qemu_coroutine_enter(co->coroutine);
aio_co_wake(co->coroutine);
}

static int coroutine_fn bdrv_driver_preadv(BlockDriverState *bs,
Expand Down Expand Up @@ -2080,6 +2080,11 @@ void bdrv_aio_cancel(BlockAIOCB *acb)
if (acb->aiocb_info->get_aio_context) {
aio_poll(acb->aiocb_info->get_aio_context(acb), true);
} else if (acb->bs) {
/* qemu_aio_ref and qemu_aio_unref are not thread-safe, so
* assert that we're not using an I/O thread. Thread-safe
* code should use bdrv_aio_cancel_async exclusively.
*/
assert(bdrv_get_aio_context(acb->bs) == qemu_get_aio_context());
aio_poll(bdrv_get_aio_context(acb->bs), true);
} else {
abort();
Expand Down Expand Up @@ -2239,35 +2244,6 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs,
return &acb->common;
}

void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
BlockCompletionFunc *cb, void *opaque)
{
BlockAIOCB *acb;

acb = g_malloc(aiocb_info->aiocb_size);
acb->aiocb_info = aiocb_info;
acb->bs = bs;
acb->cb = cb;
acb->opaque = opaque;
acb->refcnt = 1;
return acb;
}

void qemu_aio_ref(void *p)
{
BlockAIOCB *acb = p;
acb->refcnt++;
}

void qemu_aio_unref(void *p)
{
BlockAIOCB *acb = p;
assert(acb->refcnt > 0);
if (--acb->refcnt == 0) {
g_free(acb);
}
}

/**************************************************************/
/* Coroutine block device emulation */

Expand Down Expand Up @@ -2299,7 +2275,7 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs)

/* Wait until any previous flushes are completed */
while (bs->active_flush_req) {
qemu_co_queue_wait(&bs->flush_queue);
qemu_co_queue_wait(&bs->flush_queue, NULL);
}

bs->active_flush_req = true;
Expand Down
15 changes: 12 additions & 3 deletions block/iscsi.c
Expand Up @@ -165,16 +165,17 @@ iscsi_schedule_bh(IscsiAIOCB *acb)
static void iscsi_co_generic_bh_cb(void *opaque)
{
struct IscsiTask *iTask = opaque;

iTask->complete = 1;
qemu_coroutine_enter(iTask->co);
aio_co_wake(iTask->co);
}

static void iscsi_retry_timer_expired(void *opaque)
{
struct IscsiTask *iTask = opaque;
iTask->complete = 1;
if (iTask->co) {
qemu_coroutine_enter(iTask->co);
aio_co_wake(iTask->co);
}
}

Expand Down Expand Up @@ -394,8 +395,10 @@ iscsi_process_read(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;

aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLIN);
iscsi_set_events(iscsilun);
aio_context_release(iscsilun->aio_context);
}

static void
Expand All @@ -404,8 +407,10 @@ iscsi_process_write(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;

aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLOUT);
iscsi_set_events(iscsilun);
aio_context_release(iscsilun->aio_context);
}

static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun)
Expand Down Expand Up @@ -1392,16 +1397,20 @@ static void iscsi_nop_timed_event(void *opaque)
{
IscsiLun *iscsilun = opaque;

aio_context_acquire(iscsilun->aio_context);
if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) {
error_report("iSCSI: NOP timeout. Reconnecting...");
iscsilun->request_timed_out = true;
} else if (iscsi_nop_out_async(iscsilun->iscsi, NULL, NULL, 0, NULL) != 0) {
error_report("iSCSI: failed to sent NOP-Out. Disabling NOP messages.");
return;
goto out;
}

timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL);
iscsi_set_events(iscsilun);

out:
aio_context_release(iscsilun->aio_context);
}

static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp)
Expand Down

0 comments on commit a0775e2

Please sign in to comment.