Skip to content

Commit

Permalink
sheepdog: multiplex the rw FD to flush cache
Browse files Browse the repository at this point in the history
This will reduce sockfds connected to the sheep server to one, which simply the
future hacks.

Cc: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
Cc: Kevin Wolf <kwolf@redhat.com>
Cc: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Liu Yuan <tailai.ly@taobao.com>
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
  • Loading branch information
liuy authored and stefanhaRH committed Jan 15, 2013
1 parent df702c9 commit 4778307
Showing 1 changed file with 37 additions and 45 deletions.
82 changes: 37 additions & 45 deletions block/sheepdog.c
Expand Up @@ -266,6 +266,7 @@ typedef struct AIOReq {
enum AIOCBState {
AIOCB_WRITE_UDATA,
AIOCB_READ_UDATA,
AIOCB_FLUSH_CACHE,
};

struct SheepdogAIOCB {
Expand Down Expand Up @@ -299,7 +300,6 @@ typedef struct BDRVSheepdogState {
char *addr;
char *port;
int fd;
int flush_fd;

CoMutex lock;
Coroutine *co_send;
Expand Down Expand Up @@ -736,6 +736,13 @@ static void coroutine_fn aio_read_response(void *opaque)
goto out;
}
break;
case AIOCB_FLUSH_CACHE:
if (rsp.result == SD_RES_INVALID_PARMS) {
dprintf("disable cache since the server doesn't support it\n");
s->cache_flags = SD_FLAG_CMD_DIRECT;
rsp.result = SD_RES_SUCCESS;
}
break;
}

if (rsp.result != SD_RES_SUCCESS) {
Expand Down Expand Up @@ -950,7 +957,7 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
{
int nr_copies = s->inode.nr_copies;
SheepdogObjReq hdr;
unsigned int wlen;
unsigned int wlen = 0;
int ret;
uint64_t oid = aio_req->oid;
unsigned int datalen = aio_req->data_len;
Expand All @@ -964,18 +971,23 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,

memset(&hdr, 0, sizeof(hdr));

if (aiocb_type == AIOCB_READ_UDATA) {
wlen = 0;
switch (aiocb_type) {
case AIOCB_FLUSH_CACHE:
hdr.opcode = SD_OP_FLUSH_VDI;
break;
case AIOCB_READ_UDATA:
hdr.opcode = SD_OP_READ_OBJ;
hdr.flags = flags;
} else if (create) {
wlen = datalen;
hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
hdr.flags = SD_FLAG_CMD_WRITE | flags;
} else {
break;
case AIOCB_WRITE_UDATA:
if (create) {
hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
} else {
hdr.opcode = SD_OP_WRITE_OBJ;
}
wlen = datalen;
hdr.opcode = SD_OP_WRITE_OBJ;
hdr.flags = SD_FLAG_CMD_WRITE | flags;
break;
}

if (s->cache_flags) {
Expand Down Expand Up @@ -1127,15 +1139,6 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
s->cache_flags = SD_FLAG_CMD_DIRECT;
}

if (s->cache_flags == SD_FLAG_CMD_CACHE) {
s->flush_fd = connect_to_sdog(s->addr, s->port);
if (s->flush_fd < 0) {
error_report("failed to connect");
ret = s->flush_fd;
goto out;
}
}

if (snapid || tag[0] != '\0') {
dprintf("%" PRIx32 " snapshot inode was open.\n", vid);
s->is_snapshot = true;
Expand Down Expand Up @@ -1397,9 +1400,6 @@ static void sd_close(BlockDriverState *bs)

qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
closesocket(s->fd);
if (s->cache_flags) {
closesocket(s->flush_fd);
}
g_free(s->addr);
}

Expand Down Expand Up @@ -1711,39 +1711,31 @@ static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
{
BDRVSheepdogState *s = bs->opaque;
SheepdogObjReq hdr = { 0 };
SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
SheepdogInode *inode = &s->inode;
SheepdogAIOCB *acb;
AIOReq *aio_req;
int ret;
unsigned int wlen = 0, rlen = 0;

if (s->cache_flags != SD_FLAG_CMD_CACHE) {
return 0;
}

hdr.opcode = SD_OP_FLUSH_VDI;
hdr.oid = vid_to_vdi_oid(inode->vdi_id);
acb = sd_aio_setup(bs, NULL, 0, 0, NULL, NULL);
acb->aiocb_type = AIOCB_FLUSH_CACHE;
acb->aio_done_func = sd_finish_aiocb;

ret = do_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
if (ret) {
error_report("failed to send a request to the sheep");
aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
0, 0, 0, 0, 0);
QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
if (ret < 0) {
error_report("add_aio_request is failed");
free_aio_req(s, aio_req);
qemu_aio_release(acb);
return ret;
}

if (rsp->result == SD_RES_INVALID_PARMS) {
dprintf("disable write cache since the server doesn't support it\n");

s->cache_flags = SD_FLAG_CMD_DIRECT;
closesocket(s->flush_fd);
return 0;
}

if (rsp->result != SD_RES_SUCCESS) {
error_report("%s", sd_strerror(rsp->result));
return -EIO;
}

return 0;
qemu_coroutine_yield();
return acb->ret;
}

static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
Expand Down

0 comments on commit 4778307

Please sign in to comment.