Skip to content

Commit

Permalink
[XrdCeph] Fixed memory corruption in asynchronous read from ceph
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastien Ponce committed May 10, 2016
1 parent a37b4f4 commit e7d3551
Showing 1 changed file with 24 additions and 11 deletions.
35 changes: 24 additions & 11 deletions src/XrdCeph/XrdCephPosix.cc
Expand Up @@ -70,10 +70,12 @@ struct DirIterator {

/// small struct for aio API callbacks
struct AioArgs {
AioArgs(XrdSfsAio* a, AioCB *b, size_t n) : aiop(a), callback(b), nbBytes(n) {}
AioArgs(XrdSfsAio* a, AioCB *b, size_t n, ceph::bufferlist *_bl=0) :
aiop(a), callback(b), nbBytes(n), bl(_bl) {}
XrdSfsAio* aiop;
AioCB *callback;
size_t nbBytes;
ceph::bufferlist *bl;
};

/// global variables holding stripers and ioCtxs for each ceph pool plus the cluster object
Expand Down Expand Up @@ -636,7 +638,7 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
}
}

static void ceph_aio_complete(rados_completion_t c, void *arg) {
static void ceph_aio_write_complete(rados_completion_t c, void *arg) {
AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
size_t rc = rados_aio_get_return_value(c);
awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc);
Expand Down Expand Up @@ -665,14 +667,14 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
// prepare a ceph AioCompletion object and do async call
AioArgs *args = new AioArgs(aiop, cb, count);
librados::AioCompletion *completion =
g_cluster->aio_create_completion(args, ceph_aio_complete, NULL);
g_cluster->aio_create_completion(args, ceph_aio_write_complete, NULL);
int rc = striper->aio_write(fr->name, completion, bl, count, offset);
completion->release();
return rc;
} else {
return -EBADF;
}
}
}

ssize_t ceph_posix_read(int fd, void *buf, size_t count) {
CephFileRef* fr = getFileRef(fd);
Expand Down Expand Up @@ -717,12 +719,25 @@ ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) {
}
}

static void ceph_aio_read_complete(rados_completion_t c, void *arg) {
AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
size_t rc = rados_aio_get_return_value(c);
if (awa->bl) {
if (rc > 0) {
awa->bl->copy(0, rc, (char*)awa->aiop->sfsAio.aio_buf);
}
delete awa->bl;
awa->bl = 0;
}
awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc);
delete(awa);
}

ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
CephFileRef* fr = getFileRef(fd);
if (fr) {
// get the parameters from the Xroot aio object
size_t count = aiop->sfsAio.aio_nbytes;
const char *buf = (const char*)aiop->sfsAio.aio_buf;
size_t offset = aiop->sfsAio.aio_offset;
// get the striper object
logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
Expand All @@ -734,15 +749,13 @@ ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
return -EINVAL;
}
// prepare a bufferlist to receive data
ceph::bufferlist bl;
ceph::bufferlist *bl = new ceph::bufferlist();
// prepare a ceph AioCompletion object and do async call
AioArgs *args = new AioArgs(aiop, cb, count);
AioArgs *args = new AioArgs(aiop, cb, count, bl);
librados::AioCompletion *completion =
g_cluster->aio_create_completion(args, ceph_aio_complete, NULL);
int rc = striper->aio_read(fr->name, completion, &bl, count, offset);
g_cluster->aio_create_completion(args, ceph_aio_read_complete, NULL);
int rc = striper->aio_read(fr->name, completion, bl, count, offset);
completion->release();
if (rc < 0) return rc;
bl.copy(0, rc, (char*)buf);
return rc;
} else {
return -EBADF;
Expand Down

0 comments on commit e7d3551

Please sign in to comment.