From b30cb1efdbfc096b62ba90fdab43536f9c8b9eb2 Mon Sep 17 00:00:00 2001 From: Sebastien Ponce Date: Tue, 24 May 2016 15:57:17 +0200 Subject: [PATCH] [XrdCeph] Implemented pools of ceph objects Allowed to use mutiple ceph objects in parallel when talking to ceph to workaround the limitation that a single threas is dealing with async answers per object. In order to use this features, one has to add a line to the xrootd config file : ceph.nbconnections By default, only one object will be used. --- src/XrdCeph/XrdCephOss.cc | 48 ++++++++++ src/XrdCeph/XrdCephOss.hh | 2 + src/XrdCeph/XrdCephPosix.cc | 184 ++++++++++++++++++++++-------------- 3 files changed, 161 insertions(+), 73 deletions(-) diff --git a/src/XrdCeph/XrdCephOss.cc b/src/XrdCeph/XrdCephOss.cc index b25e7171d22..8d11fd013cc 100644 --- a/src/XrdCeph/XrdCephOss.cc +++ b/src/XrdCeph/XrdCephOss.cc @@ -30,6 +30,7 @@ #include "XrdOuc/XrdOucEnv.hh" #include "XrdSys/XrdSysError.hh" #include "XrdOuc/XrdOucTrace.hh" +#include "XrdOuc/XrdOucStream.hh" #include "XrdVersion.hh" #include "XrdCeph/XrdCephOss.hh" #include "XrdCeph/XrdCephOssDir.hh" @@ -77,6 +78,53 @@ XrdCephOss::~XrdCephOss() { ceph_posix_disconnect_all(); } +// declared and used in XrdCephPosix.cc +extern unsigned int g_maxCephPoolIdx; +int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute, XrdOucEnv *envP) { + int NoGo = 0; + XrdOucEnv myEnv; + XrdOucStream Config(&Eroute, getenv("XRDINSTANCE"), &myEnv, "=====> "); + // If there is no config file, nothing to be done + if (configfn && *configfn) { + // Try to open the configuration file. + int cfgFD; + if ((cfgFD = open(configfn, O_RDONLY, 0)) < 0) { + Eroute.Emsg("Config", errno, "open config file", configfn); + return 1; + } + Config.Attach(cfgFD); + // Now start reading records until eof. + char *var; + while((var = Config.GetMyFirstWord())) { + if (!strncmp(var, "ceph.nbconnections", 18)) { + var = Config.GetWord(); + if (var) { + unsigned long value = strtoul(var, 0, 10); + if (value > 0 and value <= 100) { + g_maxCephPoolIdx = value; + break; + } else { + Eroute.Emsg("Config", "Invalid value for ceph.nbconnections in config file (must be between 1 and 100)", configfn, var); + return 1; + } + } else { + Eroute.Emsg("Config", "Missing value for ceph.nbconnections in config file", configfn); + return 1; + } + } + } + + // Now check if any errors occured during file i/o + int retc = Config.LastError(); + if (retc) { + NoGo = Eroute.Emsg("Config", -retc, "read config file", + configfn); + } + Config.Close(); + } + return NoGo; +} + int XrdCephOss::Chmod(const char *path, mode_t mode, XrdOucEnv *envP) { return -ENOTSUP; } diff --git a/src/XrdCeph/XrdCephOss.hh b/src/XrdCeph/XrdCephOss.hh index 12f2ac5e231..a47f841ec72 100644 --- a/src/XrdCeph/XrdCephOss.hh +++ b/src/XrdCeph/XrdCephOss.hh @@ -55,6 +55,8 @@ public: XrdCephOss(); virtual ~XrdCephOss(); + int Configure(const char *, XrdSysError &, XrdOucEnv *envP); + virtual int Chmod(const char *, mode_t mode, XrdOucEnv *eP=0); virtual int Create(const char *, const char *, mode_t, XrdOucEnv &, int opts=0); virtual int Init(XrdSysLogger *, const char*); diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index d0fadbeff60..04b728393b9 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -41,6 +41,7 @@ #include #include #include +#include #include "XrdSfs/XrdSfsAio.hh" #include "XrdSys/XrdSysPthread.hh" @@ -78,12 +79,22 @@ struct AioArgs { ceph::bufferlist *bl; }; -/// global variables holding stripers and ioCtxs for each ceph pool plus the cluster object -std::map g_radosStripers; -std::map g_ioCtx; -librados::Rados* g_cluster = 0; +/// global variables holding stripers/ioCtxs/cluster objects +/// Note that we have a pool of them to circumvent the limitation +/// of having a single objecter/messenger per IoCtx +typedef std::map StriperDict; +std::vector g_radosStripers; +typedef std::map IOCtxDict; +std::vector g_ioCtx; +std::vector g_cluster; /// mutex protecting the striper and ioctx maps XrdSysMutex g_striper_mutex; +/// index of current Striper/IoCtx to be used +unsigned int g_cephPoolIdx = 0; +/// size of the Striper/IoCtx pool, defaults to 1 +/// may be overwritten in the configuration file +/// (See XrdCephOss::configure) +unsigned int g_maxCephPoolIdx = 1; /// global variable holding a list of files currently opened for write std::multiset g_filesOpenForWrite; @@ -94,6 +105,27 @@ unsigned int g_nextCephFd = 0; /// mutex protecting the map of file descriptors and the openForWrite multiset XrdSysMutex g_fd_mutex; +/// Accessor to next ceph pool index +/// Note that this is not thread safe, but we do not care +/// as we only want a rough load balancing +unsigned int getCephPoolIdxAndIncrease() { + if (g_radosStripers.size() == 0) { + // initialization phase : allocate corresponding places in the vectors + for (unsigned int i = 0; i < g_maxCephPoolIdx; i++) { + g_radosStripers.push_back(StriperDict()); + g_ioCtx.push_back(IOCtxDict()); + g_cluster.push_back(0); + } + } + unsigned int res = g_cephPoolIdx; + unsigned nextValue = g_cephPoolIdx+1; + if (nextValue >= g_maxCephPoolIdx) { + nextValue = 0; + } + g_cephPoolIdx = nextValue; + return res; +} + /// check whether a file is open for write bool isOpenForWrite(std::string& name) { XrdSysMutexHelper lock(g_fd_mutex); @@ -369,56 +401,51 @@ static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, return fr; } -static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file) { - std::stringstream ss; - ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ',' - << file.stripeUnit << ',' << file.objectSize; - std::string userAtPool = ss.str(); - std::map::iterator it = - g_radosStripers.find(userAtPool); - if (it == g_radosStripers.end()) { +int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile& file) { + StriperDict &sDict = g_radosStripers[cephPoolIdx]; + StriperDict::iterator it = sDict.find(userAtPool); + if (it == sDict.end()) { // we need to create a new radosStriper // Do we already have a cluster - if (0 == g_cluster) { + if (0 == g_cluster[cephPoolIdx]) { // create connection to cluster - g_cluster = new librados::Rados; - if (0 == g_cluster) { + librados::Rados *cluster = new librados::Rados; + if (0 == cluster) { return 0; } - int rc = g_cluster->init(file.userId.c_str()); + int rc = cluster->init(file.userId.c_str()); if (rc) { - delete g_cluster; - g_cluster = 0; + delete cluster; return 0; } - rc = g_cluster->conf_read_file(NULL); + rc = cluster->conf_read_file(NULL); if (rc) { - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + cluster->shutdown(); + delete cluster; return 0; } - g_cluster->conf_parse_env(NULL); - rc = g_cluster->connect(); + cluster->conf_parse_env(NULL); + rc = cluster->connect(); if (rc) { - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + cluster->shutdown(); + delete cluster; return 0; } + g_cluster[cephPoolIdx] = cluster; } // create IoCtx for our pool librados::IoCtx *ioctx = new librados::IoCtx; if (0 == ioctx) { - g_cluster->shutdown(); - delete g_cluster; + g_cluster[cephPoolIdx]->shutdown(); + delete g_cluster[cephPoolIdx]; + g_cluster[cephPoolIdx] = 0; return 0; } - int rc = g_cluster->ioctx_create(file.pool.c_str(), *ioctx); + int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx); if (rc != 0) { - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + g_cluster[cephPoolIdx]->shutdown(); + delete g_cluster[cephPoolIdx]; + g_cluster[cephPoolIdx] = 0; delete ioctx; return 0; } @@ -426,18 +453,18 @@ static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file libradosstriper::RadosStriper *striper = new libradosstriper::RadosStriper; if (0 == striper) { delete ioctx; - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + g_cluster[cephPoolIdx]->shutdown(); + delete g_cluster[cephPoolIdx]; + g_cluster[cephPoolIdx] = 0; return 0; } rc = libradosstriper::RadosStriper::striper_create(*ioctx, striper); if (rc != 0) { delete striper; delete ioctx; - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + g_cluster[cephPoolIdx]->shutdown(); + delete g_cluster[cephPoolIdx]; + g_cluster[cephPoolIdx] = 0; return 0; } // setup layout @@ -446,9 +473,9 @@ static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file logwrapper((char*)"getRadosStriper : invalid nbStripes %d", file.nbStripes); delete striper; delete ioctx; - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + g_cluster[cephPoolIdx]->shutdown(); + delete g_cluster[cephPoolIdx]; + g_cluster[cephPoolIdx] = 0; return 0; } rc = striper->set_object_layout_stripe_unit(file.stripeUnit); @@ -456,9 +483,9 @@ static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file logwrapper((char*)"getRadosStriper : invalid stripeUnit %d (must be non0, multiple of 64K)", file.stripeUnit); delete striper; delete ioctx; - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + g_cluster[cephPoolIdx]->shutdown(); + delete g_cluster[cephPoolIdx]; + g_cluster[cephPoolIdx] = 0; return 0; } rc = striper->set_object_layout_object_size(file.objectSize); @@ -466,52 +493,63 @@ static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file logwrapper((char*)"getRadosStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.objectSize); delete striper; delete ioctx; - g_cluster->shutdown(); - delete g_cluster; - g_cluster = 0; + g_cluster[cephPoolIdx]->shutdown(); + delete g_cluster[cephPoolIdx]; + g_cluster[cephPoolIdx] = 0; return 0; } - g_ioCtx.insert(std::pair(userAtPool, ioctx)); - it = g_radosStripers.insert(std::pair - (userAtPool, striper)).first; + IOCtxDict & ioDict = g_ioCtx[cephPoolIdx]; + ioDict.insert(std::pair(userAtPool, ioctx)); + sDict.insert(std::pair + (userAtPool, striper)).first; } - return it->second; -} + return 1; +} static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) { XrdSysMutexHelper lock(g_striper_mutex); - return getRadosStriperNoLock(file); + std::stringstream ss; + ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ',' + << file.stripeUnit << ',' << file.objectSize; + std::string userAtPool = ss.str(); + unsigned int cephPoolIdx = getCephPoolIdxAndIncrease(); + if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) { + return 0; + } + return g_radosStripers[cephPoolIdx][userAtPool]; } static librados::IoCtx* getIoCtx(const CephFile& file) { XrdSysMutexHelper lock(g_striper_mutex); - libradosstriper::RadosStriper *striper = getRadosStriperNoLock(file); - if (0 == striper) { - return 0; - } std::stringstream ss; ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ',' << file.stripeUnit << ',' << file.objectSize; std::string userAtPool = ss.str(); - return g_ioCtx[userAtPool]; + unsigned int cephPoolIdx = getCephPoolIdxAndIncrease(); + if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) { + return 0; + } + return g_ioCtx[cephPoolIdx][userAtPool]; } void ceph_posix_disconnect_all() { XrdSysMutexHelper lock(g_striper_mutex); - for (std::map::iterator it = - g_radosStripers.begin(); - it != g_radosStripers.end(); - it++) { - delete it->second; + for (unsigned int i= 0; i < g_maxCephPoolIdx; i++) { + for (StriperDict::iterator it2 = g_radosStripers[i].begin(); + it2 != g_radosStripers[i].end(); + it2++) { + delete it2->second; + } + for (IOCtxDict::iterator it2 = g_ioCtx[i].begin(); + it2 != g_ioCtx[i].end(); + it2++) { + delete it2->second; + } + delete g_cluster[i]; } g_radosStripers.clear(); - for (std::map::iterator it = g_ioCtx.begin(); - it != g_ioCtx.end(); - it++) { - delete it->second; - } g_ioCtx.clear(); - delete g_cluster; + g_cluster.clear(); } void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) { @@ -667,7 +705,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) { // prepare a ceph AioCompletion object and do async call AioArgs *args = new AioArgs(aiop, cb, count); librados::AioCompletion *completion = - g_cluster->aio_create_completion(args, ceph_aio_write_complete, NULL); + g_cluster[getCephPoolIdxAndIncrease()]->aio_create_completion(args, ceph_aio_write_complete, NULL); int rc = striper->aio_write(fr->name, completion, bl, count, offset); completion->release(); return rc; @@ -753,7 +791,7 @@ ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) { // prepare a ceph AioCompletion object and do async call AioArgs *args = new AioArgs(aiop, cb, count, bl); librados::AioCompletion *completion = - g_cluster->aio_create_completion(args, ceph_aio_read_complete, NULL); + g_cluster[getCephPoolIdxAndIncrease()]->aio_create_completion(args, ceph_aio_read_complete, NULL); int rc = striper->aio_read(fr->name, completion, bl, count, offset); completion->release(); return rc; @@ -996,7 +1034,7 @@ void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL) { int ceph_posix_statfs(long long *totalSpace, long long *freeSpace) { logwrapper((char*)"ceph_posix_statfs"); librados::cluster_stat_t result; - int rc = g_cluster->cluster_stat(result); + int rc = g_cluster[getCephPoolIdxAndIncrease()]->cluster_stat(result); if (0 == rc) { *totalSpace = result.kb * 1024; *freeSpace = result.kb_avail * 1024;