diff --git a/src/XrdCeph/XrdCephOssFile.cc b/src/XrdCeph/XrdCephOssFile.cc index 9fc7345a05e..2b9fb065f17 100644 --- a/src/XrdCeph/XrdCephOssFile.cc +++ b/src/XrdCeph/XrdCephOssFile.cc @@ -66,8 +66,13 @@ ssize_t XrdCephOssFile::Read(void *buff, off_t offset, size_t blen) { return rc; } -int XrdCephOssFile::Read(XrdSfsAio *aoip) { - return -ENOTSUP; +static void aioReadCallback(XrdSfsAio *aiop, size_t rc) { + aiop->Result = rc; + aiop->doneRead(); +} + +int XrdCephOssFile::Read(XrdSfsAio *aiop) { + return ceph_aio_read(m_fd, aiop, aioReadCallback); } ssize_t XrdCephOssFile::ReadRaw(void *buff, off_t offset, size_t blen) { @@ -86,17 +91,13 @@ ssize_t XrdCephOssFile::Write(const void *buff, off_t offset, size_t blen) { return rc; } +static void aioWriteCallback(XrdSfsAio *aiop, size_t rc) { + aiop->Result = rc; + aiop->doneWrite(); +} + int XrdCephOssFile::Write(XrdSfsAio *aiop) { - ssize_t rc = Write((void*)aiop->sfsAio.aio_buf, - aiop->sfsAio.aio_offset, - aiop->sfsAio.aio_nbytes); - if (aiop->sfsAio.aio_nbytes == (size_t)rc) { - aiop->Result = rc; - aiop->doneWrite(); - return 0; - } else { - return rc; - } + return ceph_aio_write(m_fd, aiop, aioWriteCallback); } int XrdCephOssFile::Fsync() { diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 1b405b3d6d7..223c0e6a3c3 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -41,6 +41,7 @@ #include #include #include +#include "XrdSfs/XrdSfsAio.hh" #include "XrdCeph/XrdCephPosix.hh" @@ -66,6 +67,14 @@ struct DirIterator { librados::IoCtx *m_ioctx; }; +/// small struct for aio API callbacks +struct AioArgs { + AioArgs(XrdSfsAio* a, AioCB *b, size_t n) : aiop(a), callback(b), nbBytes(n) {} + XrdSfsAio* aiop; + AioCB *callback; + size_t nbBytes; +}; + /// global variables holding stripers and ioCtxs for each ceph pool plus the cluster object std::map g_radosStripers; std::map g_ioCtx; @@ -464,7 +473,8 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode // in case of O_TRUNC, we should truncate the file if (flags & O_TRUNC) { int rc = ceph_posix_internal_truncate(fr, 0); - if (rc < 0) return rc; + // fail only if file exists and cannot be truncated + if (rc < 0 && rc != -ENOENT) return rc; } return g_nextCephFd-1; } @@ -542,6 +552,45 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { } } +static void ceph_aio_complete(rados_completion_t c, void *arg) { + AioArgs *awa = reinterpret_cast(arg); + size_t rc = rados_aio_get_return_value(c); + awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc); + delete(awa); +} + +ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { + std::map::iterator it = g_fds.find(fd); + if (it != g_fds.end()) { + // get the parameters from the Xroot aio object + size_t count = aiop->sfsAio.aio_nbytes; + const char *buf = (const char*)aiop->sfsAio.aio_buf; + size_t offset = aiop->sfsAio.aio_offset; + // get the striper object + CephFileRef &fr = it->second; + logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count); + if ((fr.flags & (O_WRONLY|O_RDWR)) == 0) { + return -EBADF; + } + libradosstriper::RadosStriper *striper = getRadosStriper(fr); + if (0 == striper) { + return -EINVAL; + } + // prepare a bufferlist around the given buffer + ceph::bufferlist bl; + bl.append(buf, count); + // prepare a ceph AioCompletion object and do async call + AioArgs *args = new AioArgs(aiop, cb, count); + librados::AioCompletion *completion = + g_cluster->aio_create_completion(args, ceph_aio_complete, NULL); + int rc = striper->aio_write(fr.name, completion, bl, count, offset); + completion->release(); + return rc; + } else { + return -EBADF; + } +} + ssize_t ceph_posix_read(int fd, void *buf, size_t count) { std::map::iterator it = g_fds.find(fd); if (it != g_fds.end()) { @@ -565,6 +614,39 @@ ssize_t ceph_posix_read(int fd, void *buf, size_t count) { } } +ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) { + std::map::iterator it = g_fds.find(fd); + if (it != g_fds.end()) { + // get the parameters from the Xroot aio object + size_t count = aiop->sfsAio.aio_nbytes; + const char *buf = (const char*)aiop->sfsAio.aio_buf; + size_t offset = aiop->sfsAio.aio_offset; + // get the striper object + CephFileRef &fr = it->second; + logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count); + if ((fr.flags & (O_WRONLY|O_RDWR)) != 0) { + return -EBADF; + } + libradosstriper::RadosStriper *striper = getRadosStriper(fr); + if (0 == striper) { + return -EINVAL; + } + // prepare a bufferlist to receive data + ceph::bufferlist bl; + // prepare a ceph AioCompletion object and do async call + AioArgs *args = new AioArgs(aiop, cb, count); + librados::AioCompletion *completion = + g_cluster->aio_create_completion(args, ceph_aio_complete, NULL); + int rc = striper->aio_read(fr.name, completion, &bl, count, offset); + completion->release(); + if (rc < 0) return rc; + bl.copy(0, rc, (char*)buf); + return rc; + } else { + return -EBADF; + } +} + int ceph_posix_fstat(int fd, struct stat *buf) { std::map::iterator it = g_fds.find(fd); if (it != g_fds.end()) { diff --git a/src/XrdCeph/XrdCephPosix.hh b/src/XrdCeph/XrdCephPosix.hh index bd3599b9700..d23956209b3 100644 --- a/src/XrdCeph/XrdCephPosix.hh +++ b/src/XrdCeph/XrdCephPosix.hh @@ -35,6 +35,9 @@ #include #include +class XrdSfsAio; +typedef void(AioCB)(XrdSfsAio*, size_t); + void ceph_posix_set_defaults(const char* value); void ceph_posix_disconnect_all(); void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)); @@ -43,7 +46,9 @@ int ceph_posix_close(int fd); off_t ceph_posix_lseek(int fd, off_t offset, int whence); off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence); ssize_t ceph_posix_write(int fd, const void *buf, size_t count); +ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb); ssize_t ceph_posix_read(int fd, void *buf, size_t count); +ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb); int ceph_posix_fstat(int fd, struct stat *buf); int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf); int ceph_posix_fsync(int fd);