diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 04b728393b9..eb8dc76e3d9 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -104,17 +104,24 @@ std::map g_fds; unsigned int g_nextCephFd = 0; /// mutex protecting the map of file descriptors and the openForWrite multiset XrdSysMutex g_fd_mutex; +/// mutex protecting initialization of ceph clusters +XrdSysMutex g_init_mutex; /// Accessor to next ceph pool index /// Note that this is not thread safe, but we do not care /// as we only want a rough load balancing unsigned int getCephPoolIdxAndIncrease() { if (g_radosStripers.size() == 0) { - // initialization phase : allocate corresponding places in the vectors - for (unsigned int i = 0; i < g_maxCephPoolIdx; i++) { - g_radosStripers.push_back(StriperDict()); - g_ioCtx.push_back(IOCtxDict()); - g_cluster.push_back(0); + // make sure we do not have a race condition here + XrdSysMutexHelper lock(g_init_mutex); + // double check now that we have the lock + if (g_radosStripers.size() == 0) { + // initialization phase : allocate corresponding places in the vectors + for (unsigned int i = 0; i < g_maxCephPoolIdx; i++) { + g_radosStripers.push_back(StriperDict()); + g_ioCtx.push_back(IOCtxDict()); + g_cluster.push_back(0); + } } } unsigned int res = g_cephPoolIdx; @@ -401,50 +408,59 @@ static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, return fr; } +inline librados::Rados* checkAndCreateCluster(unsigned int cephPoolIdx, + std::string userId = g_defaultParams.userId) { + if (0 == g_cluster[cephPoolIdx]) { + // create connection to cluster + librados::Rados *cluster = new librados::Rados; + if (0 == cluster) { + return 0; + } + int rc = cluster->init(userId.c_str()); + if (rc) { + delete cluster; + return 0; + } + rc = cluster->conf_read_file(NULL); + if (rc) { + cluster->shutdown(); + delete cluster; + return 0; + } + cluster->conf_parse_env(NULL); + rc = cluster->connect(); + if (rc) { + cluster->shutdown(); + delete cluster; + return 0; + } + g_cluster[cephPoolIdx] = cluster; + } + return g_cluster[cephPoolIdx]; +} + int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile& file) { StriperDict &sDict = g_radosStripers[cephPoolIdx]; StriperDict::iterator it = sDict.find(userAtPool); if (it == sDict.end()) { // we need to create a new radosStriper - // Do we already have a cluster - if (0 == g_cluster[cephPoolIdx]) { - // create connection to cluster - librados::Rados *cluster = new librados::Rados; - if (0 == cluster) { - return 0; - } - int rc = cluster->init(file.userId.c_str()); - if (rc) { - delete cluster; - return 0; - } - rc = cluster->conf_read_file(NULL); - if (rc) { - cluster->shutdown(); - delete cluster; - return 0; - } - cluster->conf_parse_env(NULL); - rc = cluster->connect(); - if (rc) { - cluster->shutdown(); - delete cluster; - return 0; - } - g_cluster[cephPoolIdx] = cluster; + // Get a cluster + librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx, file.userId); + if (0 == cluster) { + return 0; } // create IoCtx for our pool librados::IoCtx *ioctx = new librados::IoCtx; if (0 == ioctx) { - g_cluster[cephPoolIdx]->shutdown(); - delete g_cluster[cephPoolIdx]; + cluster->shutdown(); + delete cluster; g_cluster[cephPoolIdx] = 0; return 0; } int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx); if (rc != 0) { - g_cluster[cephPoolIdx]->shutdown(); - delete g_cluster[cephPoolIdx]; + cluster->shutdown(); + delete cluster; g_cluster[cephPoolIdx] = 0; delete ioctx; return 0; @@ -453,8 +469,8 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con libradosstriper::RadosStriper *striper = new libradosstriper::RadosStriper; if (0 == striper) { delete ioctx; - g_cluster[cephPoolIdx]->shutdown(); - delete g_cluster[cephPoolIdx]; + cluster->shutdown(); + delete cluster; g_cluster[cephPoolIdx] = 0; return 0; } @@ -462,8 +478,8 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con if (rc != 0) { delete striper; delete ioctx; - g_cluster[cephPoolIdx]->shutdown(); - delete g_cluster[cephPoolIdx]; + cluster->shutdown(); + delete cluster; g_cluster[cephPoolIdx] = 0; return 0; } @@ -473,8 +489,8 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con logwrapper((char*)"getRadosStriper : invalid nbStripes %d", file.nbStripes); delete striper; delete ioctx; - g_cluster[cephPoolIdx]->shutdown(); - delete g_cluster[cephPoolIdx]; + cluster->shutdown(); + delete cluster; g_cluster[cephPoolIdx] = 0; return 0; } @@ -483,8 +499,8 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con logwrapper((char*)"getRadosStriper : invalid stripeUnit %d (must be non0, multiple of 64K)", file.stripeUnit); delete striper; delete ioctx; - g_cluster[cephPoolIdx]->shutdown(); - delete g_cluster[cephPoolIdx]; + cluster->shutdown(); + delete cluster; g_cluster[cephPoolIdx] = 0; return 0; } @@ -493,8 +509,8 @@ int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, con logwrapper((char*)"getRadosStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.objectSize); delete striper; delete ioctx; - g_cluster[cephPoolIdx]->shutdown(); - delete g_cluster[cephPoolIdx]; + cluster->shutdown(); + delete cluster; g_cluster[cephPoolIdx] = 0; return 0; } @@ -702,10 +718,18 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { // prepare a bufferlist around the given buffer ceph::bufferlist bl; bl.append(buf, count); + // get the poolIdx to use + int cephPoolIdx = getCephPoolIdxAndIncrease(); + // Get the cluster to use + librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx); + if (0 == cluster) { + return -EINVAL; + } // prepare a ceph AioCompletion object and do async call AioArgs *args = new AioArgs(aiop, cb, count); librados::AioCompletion *completion = - g_cluster[getCephPoolIdxAndIncrease()]->aio_create_completion(args, ceph_aio_write_complete, NULL); + cluster->aio_create_completion(args, ceph_aio_write_complete, NULL); + // do the write int rc = striper->aio_write(fr->name, completion, bl, count, offset); completion->release(); return rc; @@ -788,10 +812,18 @@ ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) { } // prepare a bufferlist to receive data ceph::bufferlist *bl = new ceph::bufferlist(); + // get the poolIdx to use + int cephPoolIdx = getCephPoolIdxAndIncrease(); + // Get the cluster to use + librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx); + if (0 == cluster) { + return -EINVAL; + } // prepare a ceph AioCompletion object and do async call AioArgs *args = new AioArgs(aiop, cb, count, bl); librados::AioCompletion *completion = - g_cluster[getCephPoolIdxAndIncrease()]->aio_create_completion(args, ceph_aio_read_complete, NULL); + cluster->aio_create_completion(args, ceph_aio_read_complete, NULL); + // do the read int rc = striper->aio_read(fr->name, completion, bl, count, offset); completion->release(); return rc; @@ -1033,8 +1065,16 @@ void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL) { int ceph_posix_statfs(long long *totalSpace, long long *freeSpace) { logwrapper((char*)"ceph_posix_statfs"); + // get the poolIdx to use + int cephPoolIdx = getCephPoolIdxAndIncrease(); + // Get the cluster to use + librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx); + if (0 == cluster) { + return -EINVAL; + } + // call ceph stat librados::cluster_stat_t result; - int rc = g_cluster[getCephPoolIdxAndIncrease()]->cluster_stat(result); + int rc = cluster->cluster_stat(result); if (0 == rc) { *totalSpace = result.kb * 1024; *freeSpace = result.kb_avail * 1024;