Skip to content

Commit

Permalink
[XrdCeph] Implemented pools of ceph objects
Browse files Browse the repository at this point in the history
Allowed to use mutiple ceph objects in parallel when talking to ceph to workaround the limitation that a single threas is dealing with async answers per object.
In order to use this features, one has to add a line to the xrootd config file :
   ceph.nbconnections <n>
By default, only one object will be used.
  • Loading branch information
Sebastien Ponce committed Jul 11, 2016
1 parent 5bc8bb3 commit b30cb1e
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 73 deletions.
48 changes: 48 additions & 0 deletions src/XrdCeph/XrdCephOss.cc
Expand Up @@ -30,6 +30,7 @@
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdOuc/XrdOucTrace.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "XrdVersion.hh"
#include "XrdCeph/XrdCephOss.hh"
#include "XrdCeph/XrdCephOssDir.hh"
Expand Down Expand Up @@ -77,6 +78,53 @@ XrdCephOss::~XrdCephOss() {
ceph_posix_disconnect_all();
}

// declared and used in XrdCephPosix.cc
extern unsigned int g_maxCephPoolIdx;
int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute, XrdOucEnv *envP) {
int NoGo = 0;
XrdOucEnv myEnv;
XrdOucStream Config(&Eroute, getenv("XRDINSTANCE"), &myEnv, "=====> ");
// If there is no config file, nothing to be done
if (configfn && *configfn) {
// Try to open the configuration file.
int cfgFD;
if ((cfgFD = open(configfn, O_RDONLY, 0)) < 0) {
Eroute.Emsg("Config", errno, "open config file", configfn);
return 1;
}
Config.Attach(cfgFD);
// Now start reading records until eof.
char *var;
while((var = Config.GetMyFirstWord())) {
if (!strncmp(var, "ceph.nbconnections", 18)) {
var = Config.GetWord();
if (var) {
unsigned long value = strtoul(var, 0, 10);
if (value > 0 and value <= 100) {
g_maxCephPoolIdx = value;
break;
} else {
Eroute.Emsg("Config", "Invalid value for ceph.nbconnections in config file (must be between 1 and 100)", configfn, var);
return 1;
}
} else {
Eroute.Emsg("Config", "Missing value for ceph.nbconnections in config file", configfn);
return 1;
}
}
}

// Now check if any errors occured during file i/o
int retc = Config.LastError();
if (retc) {
NoGo = Eroute.Emsg("Config", -retc, "read config file",
configfn);
}
Config.Close();
}
return NoGo;
}

int XrdCephOss::Chmod(const char *path, mode_t mode, XrdOucEnv *envP) {
return -ENOTSUP;
}
Expand Down
2 changes: 2 additions & 0 deletions src/XrdCeph/XrdCephOss.hh
Expand Up @@ -55,6 +55,8 @@ public:
XrdCephOss();
virtual ~XrdCephOss();

int Configure(const char *, XrdSysError &, XrdOucEnv *envP);

virtual int Chmod(const char *, mode_t mode, XrdOucEnv *eP=0);
virtual int Create(const char *, const char *, mode_t, XrdOucEnv &, int opts=0);
virtual int Init(XrdSysLogger *, const char*);
Expand Down
184 changes: 111 additions & 73 deletions src/XrdCeph/XrdCephPosix.cc
Expand Up @@ -41,6 +41,7 @@
#include <sys/xattr.h>
#include <time.h>
#include <limits>
#include <pthread.h>
#include "XrdSfs/XrdSfsAio.hh"
#include "XrdSys/XrdSysPthread.hh"

Expand Down Expand Up @@ -78,12 +79,22 @@ struct AioArgs {
ceph::bufferlist *bl;
};

/// global variables holding stripers and ioCtxs for each ceph pool plus the cluster object
std::map<std::string, libradosstriper::RadosStriper*> g_radosStripers;
std::map<std::string, librados::IoCtx*> g_ioCtx;
librados::Rados* g_cluster = 0;
/// global variables holding stripers/ioCtxs/cluster objects
/// Note that we have a pool of them to circumvent the limitation
/// of having a single objecter/messenger per IoCtx
typedef std::map<std::string, libradosstriper::RadosStriper*> StriperDict;
std::vector<StriperDict> g_radosStripers;
typedef std::map<std::string, librados::IoCtx*> IOCtxDict;
std::vector<IOCtxDict> g_ioCtx;
std::vector<librados::Rados*> g_cluster;
/// mutex protecting the striper and ioctx maps
XrdSysMutex g_striper_mutex;
/// index of current Striper/IoCtx to be used
unsigned int g_cephPoolIdx = 0;
/// size of the Striper/IoCtx pool, defaults to 1
/// may be overwritten in the configuration file
/// (See XrdCephOss::configure)
unsigned int g_maxCephPoolIdx = 1;

/// global variable holding a list of files currently opened for write
std::multiset<std::string> g_filesOpenForWrite;
Expand All @@ -94,6 +105,27 @@ unsigned int g_nextCephFd = 0;
/// mutex protecting the map of file descriptors and the openForWrite multiset
XrdSysMutex g_fd_mutex;

/// Accessor to next ceph pool index
/// Note that this is not thread safe, but we do not care
/// as we only want a rough load balancing
unsigned int getCephPoolIdxAndIncrease() {
if (g_radosStripers.size() == 0) {
// initialization phase : allocate corresponding places in the vectors
for (unsigned int i = 0; i < g_maxCephPoolIdx; i++) {
g_radosStripers.push_back(StriperDict());
g_ioCtx.push_back(IOCtxDict());
g_cluster.push_back(0);
}
}
unsigned int res = g_cephPoolIdx;
unsigned nextValue = g_cephPoolIdx+1;
if (nextValue >= g_maxCephPoolIdx) {
nextValue = 0;
}
g_cephPoolIdx = nextValue;
return res;
}

/// check whether a file is open for write
bool isOpenForWrite(std::string& name) {
XrdSysMutexHelper lock(g_fd_mutex);
Expand Down Expand Up @@ -369,75 +401,70 @@ static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags,
return fr;
}

static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file) {
std::stringstream ss;
ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
<< file.stripeUnit << ',' << file.objectSize;
std::string userAtPool = ss.str();
std::map<std::string, libradosstriper::RadosStriper*>::iterator it =
g_radosStripers.find(userAtPool);
if (it == g_radosStripers.end()) {
int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile& file) {
StriperDict &sDict = g_radosStripers[cephPoolIdx];
StriperDict::iterator it = sDict.find(userAtPool);
if (it == sDict.end()) {
// we need to create a new radosStriper
// Do we already have a cluster
if (0 == g_cluster) {
if (0 == g_cluster[cephPoolIdx]) {
// create connection to cluster
g_cluster = new librados::Rados;
if (0 == g_cluster) {
librados::Rados *cluster = new librados::Rados;
if (0 == cluster) {
return 0;
}
int rc = g_cluster->init(file.userId.c_str());
int rc = cluster->init(file.userId.c_str());
if (rc) {
delete g_cluster;
g_cluster = 0;
delete cluster;
return 0;
}
rc = g_cluster->conf_read_file(NULL);
rc = cluster->conf_read_file(NULL);
if (rc) {
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
cluster->shutdown();
delete cluster;
return 0;
}
g_cluster->conf_parse_env(NULL);
rc = g_cluster->connect();
cluster->conf_parse_env(NULL);
rc = cluster->connect();
if (rc) {
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
cluster->shutdown();
delete cluster;
return 0;
}
g_cluster[cephPoolIdx] = cluster;
}
// create IoCtx for our pool
librados::IoCtx *ioctx = new librados::IoCtx;
if (0 == ioctx) {
g_cluster->shutdown();
delete g_cluster;
g_cluster[cephPoolIdx]->shutdown();
delete g_cluster[cephPoolIdx];
g_cluster[cephPoolIdx] = 0;
return 0;
}
int rc = g_cluster->ioctx_create(file.pool.c_str(), *ioctx);
int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx);
if (rc != 0) {
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
g_cluster[cephPoolIdx]->shutdown();
delete g_cluster[cephPoolIdx];
g_cluster[cephPoolIdx] = 0;
delete ioctx;
return 0;
}
// create RadosStriper connection
libradosstriper::RadosStriper *striper = new libradosstriper::RadosStriper;
if (0 == striper) {
delete ioctx;
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
g_cluster[cephPoolIdx]->shutdown();
delete g_cluster[cephPoolIdx];
g_cluster[cephPoolIdx] = 0;
return 0;
}
rc = libradosstriper::RadosStriper::striper_create(*ioctx, striper);
if (rc != 0) {
delete striper;
delete ioctx;
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
g_cluster[cephPoolIdx]->shutdown();
delete g_cluster[cephPoolIdx];
g_cluster[cephPoolIdx] = 0;
return 0;
}
// setup layout
Expand All @@ -446,72 +473,83 @@ static libradosstriper::RadosStriper* getRadosStriperNoLock(const CephFile& file
logwrapper((char*)"getRadosStriper : invalid nbStripes %d", file.nbStripes);
delete striper;
delete ioctx;
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
g_cluster[cephPoolIdx]->shutdown();
delete g_cluster[cephPoolIdx];
g_cluster[cephPoolIdx] = 0;
return 0;
}
rc = striper->set_object_layout_stripe_unit(file.stripeUnit);
if (rc != 0) {
logwrapper((char*)"getRadosStriper : invalid stripeUnit %d (must be non0, multiple of 64K)", file.stripeUnit);
delete striper;
delete ioctx;
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
g_cluster[cephPoolIdx]->shutdown();
delete g_cluster[cephPoolIdx];
g_cluster[cephPoolIdx] = 0;
return 0;
}
rc = striper->set_object_layout_object_size(file.objectSize);
if (rc != 0) {
logwrapper((char*)"getRadosStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.objectSize);
delete striper;
delete ioctx;
g_cluster->shutdown();
delete g_cluster;
g_cluster = 0;
g_cluster[cephPoolIdx]->shutdown();
delete g_cluster[cephPoolIdx];
g_cluster[cephPoolIdx] = 0;
return 0;
}
g_ioCtx.insert(std::pair<std::string, librados::IoCtx*>(userAtPool, ioctx));
it = g_radosStripers.insert(std::pair<std::string, libradosstriper::RadosStriper*>
(userAtPool, striper)).first;
IOCtxDict & ioDict = g_ioCtx[cephPoolIdx];
ioDict.insert(std::pair<std::string, librados::IoCtx*>(userAtPool, ioctx));
sDict.insert(std::pair<std::string, libradosstriper::RadosStriper*>
(userAtPool, striper)).first;
}
return it->second;
}
return 1;
}

static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) {
XrdSysMutexHelper lock(g_striper_mutex);
return getRadosStriperNoLock(file);
std::stringstream ss;
ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
<< file.stripeUnit << ',' << file.objectSize;
std::string userAtPool = ss.str();
unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
return 0;
}
return g_radosStripers[cephPoolIdx][userAtPool];
}

static librados::IoCtx* getIoCtx(const CephFile& file) {
XrdSysMutexHelper lock(g_striper_mutex);
libradosstriper::RadosStriper *striper = getRadosStriperNoLock(file);
if (0 == striper) {
return 0;
}
std::stringstream ss;
ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
<< file.stripeUnit << ',' << file.objectSize;
std::string userAtPool = ss.str();
return g_ioCtx[userAtPool];
unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
return 0;
}
return g_ioCtx[cephPoolIdx][userAtPool];
}

void ceph_posix_disconnect_all() {
XrdSysMutexHelper lock(g_striper_mutex);
for (std::map<std::string, libradosstriper::RadosStriper*>::iterator it =
g_radosStripers.begin();
it != g_radosStripers.end();
it++) {
delete it->second;
for (unsigned int i= 0; i < g_maxCephPoolIdx; i++) {
for (StriperDict::iterator it2 = g_radosStripers[i].begin();
it2 != g_radosStripers[i].end();
it2++) {
delete it2->second;
}
for (IOCtxDict::iterator it2 = g_ioCtx[i].begin();
it2 != g_ioCtx[i].end();
it2++) {
delete it2->second;
}
delete g_cluster[i];
}
g_radosStripers.clear();
for (std::map<std::string, librados::IoCtx*>::iterator it = g_ioCtx.begin();
it != g_ioCtx.end();
it++) {
delete it->second;
}
g_ioCtx.clear();
delete g_cluster;
g_cluster.clear();
}

void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) {
Expand Down Expand Up @@ -667,7 +705,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
// prepare a ceph AioCompletion object and do async call
AioArgs *args = new AioArgs(aiop, cb, count);
librados::AioCompletion *completion =
g_cluster->aio_create_completion(args, ceph_aio_write_complete, NULL);
g_cluster[getCephPoolIdxAndIncrease()]->aio_create_completion(args, ceph_aio_write_complete, NULL);
int rc = striper->aio_write(fr->name, completion, bl, count, offset);
completion->release();
return rc;
Expand Down Expand Up @@ -753,7 +791,7 @@ ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
// prepare a ceph AioCompletion object and do async call
AioArgs *args = new AioArgs(aiop, cb, count, bl);
librados::AioCompletion *completion =
g_cluster->aio_create_completion(args, ceph_aio_read_complete, NULL);
g_cluster[getCephPoolIdxAndIncrease()]->aio_create_completion(args, ceph_aio_read_complete, NULL);
int rc = striper->aio_read(fr->name, completion, bl, count, offset);
completion->release();
return rc;
Expand Down Expand Up @@ -996,7 +1034,7 @@ void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL) {
int ceph_posix_statfs(long long *totalSpace, long long *freeSpace) {
logwrapper((char*)"ceph_posix_statfs");
librados::cluster_stat_t result;
int rc = g_cluster->cluster_stat(result);
int rc = g_cluster[getCephPoolIdxAndIncrease()]->cluster_stat(result);
if (0 == rc) {
*totalSpace = result.kb * 1024;
*freeSpace = result.kb_avail * 1024;
Expand Down

0 comments on commit b30cb1e

Please sign in to comment.