diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 644490ca00a..d0fadbeff60 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -42,6 +42,7 @@ #include #include #include "XrdSfs/XrdSfsAio.hh" +#include "XrdSys/XrdSysPthread.hh" #include "XrdCeph/XrdCephPosix.hh" @@ -69,22 +70,79 @@ struct DirIterator { /// small struct for aio API callbacks struct AioArgs { - AioArgs(XrdSfsAio* a, AioCB *b, size_t n) : aiop(a), callback(b), nbBytes(n) {} + AioArgs(XrdSfsAio* a, AioCB *b, size_t n, ceph::bufferlist *_bl=0) : + aiop(a), callback(b), nbBytes(n), bl(_bl) {} XrdSfsAio* aiop; AioCB *callback; size_t nbBytes; + ceph::bufferlist *bl; }; /// global variables holding stripers and ioCtxs for each ceph pool plus the cluster object std::map g_radosStripers; std::map g_ioCtx; librados::Rados* g_cluster = 0; -/// global variable holding a map of file descriptor to file reference -std::map g_fds; +/// mutex protecting the striper and ioctx maps +XrdSysMutex g_striper_mutex; + /// global variable holding a list of files currently opened for write std::multiset g_filesOpenForWrite; +/// global variable holding a map of file descriptor to file reference +std::map g_fds; /// global variable remembering the next available file descriptor unsigned int g_nextCephFd = 0; +/// mutex protecting the map of file descriptors and the openForWrite multiset +XrdSysMutex g_fd_mutex; + +/// check whether a file is open for write +bool isOpenForWrite(std::string& name) { + XrdSysMutexHelper lock(g_fd_mutex); + return g_filesOpenForWrite.find(name) != g_filesOpenForWrite.end(); +} + +/// insert a filename in the list of files opened for write +void insertOpenForWrite(std::string& name) { + XrdSysMutexHelper lock(g_fd_mutex); + g_filesOpenForWrite.insert(name); +} + +/// delete a filename from the list of files opened for write +void deleteOpenForWrite(std::string& name) { + XrdSysMutexHelper lock(g_fd_mutex); + g_filesOpenForWrite.erase(g_filesOpenForWrite.find(name)); +} + +/// look for a FileRef from its file descriptor +CephFileRef* getFileRef(int fd) { + XrdSysMutexHelper lock(g_fd_mutex); + std::map::iterator it = g_fds.find(fd); + if (it != g_fds.end()) { + return &(it->second); + } else { + return 0; + } +} + +/// deletes a FileRef from the global table of file descriptors +void deleteFileRef(int fd) { + XrdSysMutexHelper lock(g_fd_mutex); + std::map::iterator it = g_fds.find(fd); + if (it != g_fds.end()) { + g_fds.erase(it); + } +} + +/** + * inserts a new FileRef into the global table of file descriptors + * and return the associated file descriptor + */ +int insertFileRef(CephFileRef &fr) { + XrdSysMutexHelper lock(g_fd_mutex); + g_fds[g_nextCephFd] = fr; + g_nextCephFd++; + return g_nextCephFd-1; +} + /// global variable containing defaults for CephFiles CephFile g_defaultParams = { "", "default", // default pool @@ -311,7 +369,7 @@ static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, return fr; } -static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) { +static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file) { std::stringstream ss; ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ',' << file.stripeUnit << ',' << file.objectSize; @@ -420,8 +478,14 @@ static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) { return it->second; } +static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) { + XrdSysMutexHelper lock(g_striper_mutex); + return getRadosStriperNoLock(file); +} + static librados::IoCtx* getIoCtx(const CephFile& file) { - libradosstriper::RadosStriper *striper = getRadosStriper(file); + XrdSysMutexHelper lock(g_striper_mutex); + libradosstriper::RadosStriper *striper = getRadosStriperNoLock(file); if (0 == striper) { return 0; } @@ -433,6 +497,7 @@ static librados::IoCtx* getIoCtx(const CephFile& file) { } void ceph_posix_disconnect_all() { + XrdSysMutexHelper lock(g_striper_mutex); for (std::map::iterator it = g_radosStripers.begin(); it != g_radosStripers.end(); @@ -456,12 +521,11 @@ void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) { static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size); int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode) { - logwrapper((char*)"ceph_open : fd %d associated to %s", g_nextCephFd, pathname); CephFileRef fr = getCephFileRef(pathname, env, flags, mode, 0); - g_fds[g_nextCephFd] = fr; - g_nextCephFd++; + int fd = insertFileRef(fr); + logwrapper((char*)"ceph_open : fd %d associated to %s", fd, pathname); if (flags & (O_WRONLY|O_RDWR)) { - g_filesOpenForWrite.insert(fr.name); + insertOpenForWrite(fr.name); } // in case of O_CREAT and O_EXCL, we should complain if the file exists if ((flags & O_CREAT) && (flags & O_EXCL)) { @@ -480,17 +544,17 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode // fail only if file exists and cannot be truncated if (rc < 0 && rc != -ENOENT) return rc; } - return g_nextCephFd-1; + return fd; } int ceph_posix_close(int fd) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_close: closed fd %d", fd); - if (it->second.flags & (O_WRONLY|O_RDWR)) { - g_filesOpenForWrite.erase(g_filesOpenForWrite.find(it->second.name)); + if (fr->flags & (O_WRONLY|O_RDWR)) { + deleteOpenForWrite(fr->name); } - g_fds.erase(it); + deleteFileRef(fd); return 0; } else { return -EBADF; @@ -512,44 +576,41 @@ static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence) } off_t ceph_posix_lseek(int fd, off_t offset, int whence) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_lseek: for fd %d, offset=%lld, whence=%d", fd, offset, whence); - return (off_t)lseek_compute_offset(fr, offset, whence); + return (off_t)lseek_compute_offset(*fr, offset, whence); } else { return -EBADF; } } off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_lseek64: for fd %d, offset=%lld, whence=%d", fd, offset, whence); - return lseek_compute_offset(fr, offset, whence); + return lseek_compute_offset(*fr, offset, whence); } else { return -EBADF; } } ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count); - if ((fr.flags & (O_WRONLY|O_RDWR)) == 0) { + if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) { return -EBADF; } - libradosstriper::RadosStriper *striper = getRadosStriper(fr); + libradosstriper::RadosStriper *striper = getRadosStriper(*fr); if (0 == striper) { return -EINVAL; } ceph::bufferlist bl; bl.append((const char*)buf, count); - int rc = striper->write(fr.name, bl, count, fr.offset); + int rc = striper->write(fr->name, bl, count, fr->offset); if (rc) return rc; - fr.offset += count; + fr->offset += count; return count; } else { return -EBADF; @@ -557,20 +618,19 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) { } ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count); - if ((fr.flags & (O_WRONLY|O_RDWR)) == 0) { + if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) { return -EBADF; } - libradosstriper::RadosStriper *striper = getRadosStriper(fr); + libradosstriper::RadosStriper *striper = getRadosStriper(*fr); if (0 == striper) { return -EINVAL; } ceph::bufferlist bl; bl.append((const char*)buf, count); - int rc = striper->write(fr.name, bl, count, offset); + int rc = striper->write(fr->name, bl, count, offset); if (rc) return rc; return count; } else { @@ -578,7 +638,7 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) } } -static void ceph_aio_complete(rados_completion_t c, void *arg) { +static void ceph_aio_write_complete(rados_completion_t c, void *arg) { AioArgs *awa = reinterpret_cast(arg); size_t rc = rados_aio_get_return_value(c); awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc); @@ -586,19 +646,18 @@ static void ceph_aio_complete(rados_completion_t c, void *arg) { } ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { + CephFileRef* fr = getFileRef(fd); + if (fr) { // get the parameters from the Xroot aio object size_t count = aiop->sfsAio.aio_nbytes; const char *buf = (const char*)aiop->sfsAio.aio_buf; size_t offset = aiop->sfsAio.aio_offset; // get the striper object - CephFileRef &fr = it->second; logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count); - if ((fr.flags & (O_WRONLY|O_RDWR)) == 0) { + if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) { return -EBADF; } - libradosstriper::RadosStriper *striper = getRadosStriper(fr); + libradosstriper::RadosStriper *striper = getRadosStriper(*fr); if (0 == striper) { return -EINVAL; } @@ -608,32 +667,31 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { // prepare a ceph AioCompletion object and do async call AioArgs *args = new AioArgs(aiop, cb, count); librados::AioCompletion *completion = - g_cluster->aio_create_completion(args, ceph_aio_complete, NULL); - int rc = striper->aio_write(fr.name, completion, bl, count, offset); + g_cluster->aio_create_completion(args, ceph_aio_write_complete, NULL); + int rc = striper->aio_write(fr->name, completion, bl, count, offset); completion->release(); return rc; } else { return -EBADF; } -} +} ssize_t ceph_posix_read(int fd, void *buf, size_t count) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count); - if ((fr.flags & (O_WRONLY|O_RDWR)) != 0) { + if ((fr->flags & (O_WRONLY|O_RDWR)) != 0) { return -EBADF; } - libradosstriper::RadosStriper *striper = getRadosStriper(fr); + libradosstriper::RadosStriper *striper = getRadosStriper(*fr); if (0 == striper) { return -EINVAL; } ceph::bufferlist bl; - int rc = striper->read(fr.name, &bl, count, fr.offset); + int rc = striper->read(fr->name, &bl, count, fr->offset); if (rc < 0) return rc; bl.copy(0, rc, (char*)buf); - fr.offset += rc; + fr->offset += rc; return rc; } else { return -EBADF; @@ -641,19 +699,18 @@ ssize_t ceph_posix_read(int fd, void *buf, size_t count) { } ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count); - if ((fr.flags & (O_WRONLY|O_RDWR)) != 0) { + if ((fr->flags & (O_WRONLY|O_RDWR)) != 0) { return -EBADF; } - libradosstriper::RadosStriper *striper = getRadosStriper(fr); + libradosstriper::RadosStriper *striper = getRadosStriper(*fr); if (0 == striper) { return -EINVAL; } ceph::bufferlist bl; - int rc = striper->read(fr.name, &bl, count, offset); + int rc = striper->read(fr->name, &bl, count, offset); if (rc < 0) return rc; bl.copy(0, rc, (char*)buf); return rc; @@ -662,33 +719,43 @@ ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) { } } +static void ceph_aio_read_complete(rados_completion_t c, void *arg) { + AioArgs *awa = reinterpret_cast(arg); + size_t rc = rados_aio_get_return_value(c); + if (awa->bl) { + if (rc > 0) { + awa->bl->copy(0, rc, (char*)awa->aiop->sfsAio.aio_buf); + } + delete awa->bl; + awa->bl = 0; + } + awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc); + delete(awa); +} + ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { + CephFileRef* fr = getFileRef(fd); + if (fr) { // get the parameters from the Xroot aio object size_t count = aiop->sfsAio.aio_nbytes; - const char *buf = (const char*)aiop->sfsAio.aio_buf; size_t offset = aiop->sfsAio.aio_offset; // get the striper object - CephFileRef &fr = it->second; logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count); - if ((fr.flags & (O_WRONLY|O_RDWR)) != 0) { + if ((fr->flags & (O_WRONLY|O_RDWR)) != 0) { return -EBADF; } - libradosstriper::RadosStriper *striper = getRadosStriper(fr); + libradosstriper::RadosStriper *striper = getRadosStriper(*fr); if (0 == striper) { return -EINVAL; } // prepare a bufferlist to receive data - ceph::bufferlist bl; + ceph::bufferlist *bl = new ceph::bufferlist(); // prepare a ceph AioCompletion object and do async call - AioArgs *args = new AioArgs(aiop, cb, count); + AioArgs *args = new AioArgs(aiop, cb, count, bl); librados::AioCompletion *completion = - g_cluster->aio_create_completion(args, ceph_aio_complete, NULL); - int rc = striper->aio_read(fr.name, completion, &bl, count, offset); + g_cluster->aio_create_completion(args, ceph_aio_read_complete, NULL); + int rc = striper->aio_read(fr->name, completion, bl, count, offset); completion->release(); - if (rc < 0) return rc; - bl.copy(0, rc, (char*)buf); return rc; } else { return -EBADF; @@ -696,19 +763,18 @@ ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) { } int ceph_posix_fstat(int fd, struct stat *buf) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_stat: fd %d", fd); // minimal stat : only size and times are filled // atime, mtime and ctime are set all to the same value // mode is set arbitrarily to 0666 - libradosstriper::RadosStriper *striper = getRadosStriper(fr); + libradosstriper::RadosStriper *striper = getRadosStriper(*fr); if (0 == striper) { return -EINVAL; } memset(buf, 0, sizeof(*buf)); - int rc = striper->stat(fr.name, (uint64_t*)&(buf->st_size), &(buf->st_atime)); + int rc = striper->stat(fr->name, (uint64_t*)&(buf->st_size), &(buf->st_atime)); if (rc != 0) { return -rc; } @@ -736,7 +802,7 @@ int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf) { if (rc != 0) { // for non existing file. Check that we did not open it for write recently // in that case, we return 0 size and current time - if (-ENOENT == rc && g_filesOpenForWrite.find(file.name) != g_filesOpenForWrite.end()) { + if (-ENOENT == rc && isOpenForWrite(file.name)) { buf->st_size = 0; buf->st_atime = time(NULL); } else { @@ -750,8 +816,8 @@ int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf) { } int ceph_posix_fsync(int fd) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_sync: fd %d", fd); return 0; } else { @@ -760,14 +826,13 @@ int ceph_posix_fsync(int fd) { } int ceph_posix_fcntl(int fd, int cmd, ... /* arg */ ) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_fcntl: fd %d cmd=%d", fd, cmd); // minimal implementation switch (cmd) { case F_GETFL: - return fr.mode; + return fr->mode; default: return -EINVAL; } @@ -799,11 +864,10 @@ ssize_t ceph_posix_getxattr(XrdOucEnv* env, const char* path, ssize_t ceph_posix_fgetxattr(int fd, const char* name, void* value, size_t size) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_fgetxattr: fd %d name=%s", fd, name); - return ceph_posix_internal_getxattr(fr, name, value, size); + return ceph_posix_internal_getxattr(*fr, name, value, size); } else { return -EBADF; } @@ -834,11 +898,10 @@ ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path, int ceph_posix_fsetxattr(int fd, const char* name, const void* value, size_t size, int flags) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_fsetxattr: fd %d name=%s value=%s", fd, name, value); - return ceph_posix_internal_setxattr(fr, name, value, size, flags); + return ceph_posix_internal_setxattr(*fr, name, value, size, flags); } else { return -EBADF; } @@ -863,11 +926,10 @@ int ceph_posix_removexattr(XrdOucEnv* env, const char* path, } int ceph_posix_fremovexattr(int fd, const char* name) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_fremovexattr: fd %d name=%s", fd, name); - return ceph_posix_internal_removexattr(fr, name); + return ceph_posix_internal_removexattr(*fr, name); } else { return -EBADF; } @@ -913,11 +975,10 @@ int ceph_posix_listxattrs(XrdOucEnv* env, const char* path, XrdSysXAttr::AList * } int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_flistxattrs: fd %d", fd); - return ceph_posix_internal_listxattrs(fr, aPL, getSz); + return ceph_posix_internal_listxattrs(*fr, aPL, getSz); } else { return -EBADF; } @@ -952,11 +1013,10 @@ static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long } int ceph_posix_ftruncate(int fd, unsigned long long size) { - std::map::iterator it = g_fds.find(fd); - if (it != g_fds.end()) { - CephFileRef &fr = it->second; + CephFileRef* fr = getFileRef(fd); + if (fr) { logwrapper((char*)"ceph_posix_ftruncate: fd %d, size %d", fd, size); - return ceph_posix_internal_truncate(fr, size); + return ceph_posix_internal_truncate(*fr, size); } else { return -EBADF; }