Skip to content

Commit

Permalink
Make oss spaces used for data and metadata files configurable.
Browse files Browse the repository at this point in the history
  oss.localroot  /sdd/localroot
  oss.space data /raid/cache
  oss.space meta /sdd/cinfo

  pfc.spaces     data meta

Also allow users to specify sleep time during consecutive purge
operations (default is 300 s).

  pfc.diskusage 168g 180g sleep 1800
  • Loading branch information
osschar committed Sep 14, 2016
1 parent f5a804f commit 3dcee51
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 117 deletions.
28 changes: 21 additions & 7 deletions src/XrdFileCache/XrdFileCache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,44 @@ namespace XrdFileCache
{
Configuration() :
m_hdfsmode(false),
m_data_space("public"),
m_meta_space("public"),
m_diskUsageLWM(-1),
m_diskUsageHWM(-1),
m_purgeInterval(300),
m_bufferSize(1024*1024),
m_RamAbsAvailable(0),
m_NRamBuffers(-1),
m_prefetch_max_blocks(10),
m_hdfsbsize(128*1024*1024) {}
m_hdfsbsize(128*1024*1024)
{}

bool m_hdfsmode; //!< flag for enabling block-level operation
std::string m_cache_dir; //!< path of disk cache
std::string m_username; //!< username passed to oss plugin
std::string m_data_space; //!< oss space for data files
std::string m_meta_space; //!< oss space for metadata files (cinfo)

long long m_diskUsageLWM; //!< cache purge low water mark
long long m_diskUsageHWM; //!< cache purge high water mark
int m_purgeInterval; //!< sleep interval between cache purges

long long m_bufferSize; //!< prefetch buffer size, default 1MB
long long m_RamAbsAvailable; //!< available from configuration
long long m_RamAbsAvailable; //!< available from configuration
int m_NRamBuffers; //!< number of total in-memory cache blocks, cached
size_t m_prefetch_max_blocks;//!< maximum number of blocks to prefetch per file

long long m_hdfsbsize; //!< used with m_hdfsmode, default 128MB
};

struct TmpConfiguration
{
std::string m_diskUsageLWM;
std::string m_diskUsageHWM;

TmpConfiguration() :
m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95")
{}
};

//----------------------------------------------------------------------------
//! Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Expand Down Expand Up @@ -181,15 +195,15 @@ namespace XrdFileCache
XrdOss* GetOss() const { return m_output_fs; }

XrdSysError& GetSysError() { return m_log; }

File* GetFileWithLocalPath(std::string, IO* io);

void AddActive(IO*, File*);


XrdOucTrace* GetTrace() { return m_trace; }

private:
bool ConfigParameters(std::string, XrdOucStream&);
bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
bool ConfigXeq(char *, XrdOucStream &);
bool xdlib(XrdOucStream &);
bool xtrace(XrdOucStream &);
Expand Down
177 changes: 105 additions & 72 deletions src/XrdFileCache/XrdFileCacheConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
&XrdVERSIONINFOVAR(XrdOucGetCache2));
if (!ofsCfg) return false;

TmpConfiguration tmpc;

if (ofsCfg->Load(XrdOfsConfigPI::theOssLib)) {
if (ofsCfg->Load(XrdOfsConfigPI::theOssLib))
{
ofsCfg->Plugin(m_output_fs);
XrdOssCache_FS* ocfs = XrdOssCache::Find("public");
ocfs->Add(m_configuration.m_cache_dir.c_str());
}
else
{
Expand Down Expand Up @@ -163,10 +163,10 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
}
else if (!strncmp(var,"pfc.", 4))
{
retval = ConfigParameters(std::string(var+4), Config);
retval = ConfigParameters(std::string(var+4), Config, tmpc);
}

if (!retval)
if ( ! retval)
{
retval = false;
TRACE(Error, "Cache::Config() error in parsing");
Expand All @@ -176,21 +176,50 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
}

Config.Close();

// sets default value for disk usage
if (m_configuration.m_diskUsageLWM < 0 || m_configuration.m_diskUsageHWM < 0)
{
XrdOssVSInfo sP;
if (m_output_fs->StatVS(&sP, "public", 1) >= 0) {
m_configuration.m_diskUsageLWM = static_cast<long long>(0.90 * sP.Total + 0.5);
m_configuration.m_diskUsageHWM = static_cast<long long>(0.95 * sP.Total + 0.5);
if (m_output_fs->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
{
m_log.Emsg("Cache::ConfigParameters() error obtaining stat info for space ", m_configuration.m_data_space.c_str());
return false;
}

if (::isalpha(*(tmpc.m_diskUsageLWM.rbegin())) && ::isalpha(*(tmpc.m_diskUsageHWM.rbegin())))
{
if (XrdOuca2x::a2sz(m_log, "Error getting disk usage low watermark", tmpc.m_diskUsageLWM.c_str(), &m_configuration.m_diskUsageLWM, 0, sP.Total) ||
XrdOuca2x::a2sz(m_log, "Error getting disk usage high watermark", tmpc.m_diskUsageHWM.c_str(), &m_configuration.m_diskUsageHWM, 0, sP.Total))
{
return false;
}
}
else
{
char* eP;
errno = 0;
double lwmf = strtod(tmpc.m_diskUsageLWM.c_str(), &eP);
if (errno || eP == tmpc.m_diskUsageLWM.c_str())
{
m_log.Emsg("Cache::ConfigParameters() error parsing diskusage parameter ", tmpc.m_diskUsageLWM.c_str());
return false;
}
double hwmf = strtod(tmpc.m_diskUsageHWM.c_str(), &eP);
if (errno || eP == tmpc.m_diskUsageHWM.c_str()) {
m_log.Emsg("Cache::ConfigParameters() error parsing diskusage parameter ", tmpc.m_diskUsageHWM.c_str());
return false;
}

m_configuration.m_diskUsageLWM = static_cast<long long>(sP.Total * lwmf + 0.5);
m_configuration.m_diskUsageHWM = static_cast<long long>(sP.Total * hwmf + 0.5);
}
}

// get number of available RAM blocks after process configuration
if (m_configuration.m_RamAbsAvailable == 0 )
if (m_configuration.m_RamAbsAvailable == 0)
{
TRACE(Error, "RAM usage not specified. Please set pfc.ram value in configuration file.");
return false;
TRACE(Error, "RAM usage not specified. Please set pfc.ram value in configuration file.");
return false;
}
m_configuration.m_NRamBuffers = static_cast<int>(m_configuration.m_RamAbsAvailable/ m_configuration.m_bufferSize);

Expand All @@ -204,15 +233,22 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
char buff[2048];
float rg = (m_configuration.m_RamAbsAvailable)/float(1024*1024*1024);
loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
" pfc.blocksize %lld\n"
" pfc.prefetch %ld\n"
" pfc.ram %.fg"
" pfc.trace %d",
config_filename,
m_configuration.m_bufferSize,
m_configuration.m_prefetch_max_blocks, // AMT not sure what parsing should be
rg,
m_trace->What);
" pfc.blocksize %lld\n"
" pfc.prefetch %ld\n"
" pfc.ram %.fg\n"
" pfc.diskusage %lld %lld sleep %d\n"
" pfc.spaces %s %s\n"
" pfc.trace %d",
config_filename,
m_configuration.m_bufferSize,
m_configuration.m_prefetch_max_blocks, // AMT not sure what parsing should be
rg,
m_configuration.m_diskUsageLWM,
m_configuration.m_diskUsageHWM,
m_configuration.m_purgeInterval,
m_configuration.m_data_space.c_str(),
m_configuration.m_meta_space.c_str(),
m_trace->What);

if (m_configuration.m_hdfsmode)
{
Expand All @@ -223,12 +259,12 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char

char unameBuff[256];
if (m_configuration.m_username.empty()) {
XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
m_configuration.m_username = unameBuff;
XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
m_configuration.m_username = unameBuff;
}
else {
snprintf(unameBuff, sizeof(unameBuff), "\tpfc.user %s \n", m_configuration.m_username.c_str());
loff += snprintf(&buff[loff], strlen(unameBuff), "%s", unameBuff);
snprintf(unameBuff, sizeof(unameBuff), "\tpfc.user %s \n", m_configuration.m_username.c_str());
loff += snprintf(&buff[loff], strlen(unameBuff), "%s", unameBuff);
}

m_log.Say( buff);
Expand All @@ -237,69 +273,50 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
m_log.Say("------ File Caching Proxy interface initialization ", retval ? "completed" : "failed");

if (ofsCfg) delete ofsCfg;

return retval;
}

//______________________________________________________________________________


bool Cache::ConfigParameters(std::string part, XrdOucStream& config )
bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfiguration &tmpc)
{
XrdSysError err(0, "");
if ( part == "user" )
{
m_configuration.m_username = config.GetWord();
}
else if ( part == "cachedir" )
{
m_configuration.m_cache_dir = config.GetWord();
}
else if ( part == "diskusage" )
{
std::string minV = config.GetWord();
std::string maxV = config.GetWord();
if (!minV.empty() && !maxV.empty()) {
XrdOssVSInfo sP;
if (m_output_fs->StatVS(&sP, "public", 1) >= 0)
tmpc.m_diskUsageLWM = config.GetWord();
tmpc.m_diskUsageHWM = config.GetWord();

if (tmpc.m_diskUsageHWM.empty())
{
m_log.Emsg("Config", "Error: diskusage parameter requires two arguments.");
return false;
}
const char *p = config.GetWord();
if (p && strcmp(p, "sleep") == 0)
{
p = config.GetWord();
if (XrdOuca2x::a2i(m_log, "Error getting purge interval", p, &m_configuration.m_purgeInterval, 60, 3600))
{
if (::isalpha(*(minV.rbegin())) && ::isalpha(*(minV.rbegin()))) {
if ( XrdOuca2x::a2sz(m_log, "Error getting disk usage low watermark", minV.c_str(), &m_configuration.m_diskUsageLWM, 0, sP.Total)
|| XrdOuca2x::a2sz(m_log, "Error getting disk usage high watermark", maxV.c_str(), &m_configuration.m_diskUsageHWM, 0, sP.Total))
{
return false;
}
}
else
{
char* eP;
errno = 0;
float lwmf = strtod(minV.c_str(), &eP);
if (errno || eP == minV.c_str()) {
m_log.Emsg("Factory::ConfigParameters() error parsing diskusage parameter ", minV.c_str());
return false;
}
float hwmf = strtod(maxV.c_str(), &eP);
if (errno || eP == maxV.c_str()) {
m_log.Emsg("Factory::ConfigParameters() error parsing diskusage parameter ", maxV.c_str());
return false;
}

m_configuration.m_diskUsageLWM = static_cast<long long>(sP.Total * lwmf + 0.5);
m_configuration.m_diskUsageHWM = static_cast<long long>(sP.Total * hwmf + 0.5);
}
return false;
}
}
}
else if ( part == "blocksize" )
{
long long minBSize = 64 * 1024;
long long maxBSize = 16 * 1024 * 1024;
if ( XrdOuca2x::a2sz(m_log, "get block size", config.GetWord(), &m_configuration.m_bufferSize, minBSize, maxBSize))
if (XrdOuca2x::a2sz(m_log, "get block size", config.GetWord(), &m_configuration.m_bufferSize, minBSize, maxBSize))
{
return false;
}
}
else if (part == "prefetch" )
else if ( part == "prefetch" )
{
const char* params = config.GetWord();
if (params) {
Expand All @@ -318,31 +335,47 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config )
return false;
}
}
else if (part == "ram" )
else if ( part == "ram" )
{
long long minRAM = 1024* 1024 * 1024;;
long long maxRAM = 100 * minRAM;
long long minRAM = 1024 * 1024 * 1024;
long long maxRAM = 256 * minRAM;
if ( XrdOuca2x::a2sz(m_log, "get RAM available", config.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
{
return false;
}
}
else if ( part == "spaces" )
{
const char *par;
par = config.GetWord();
if (par) m_configuration.m_data_space = par;
par = config.GetWord();
if (par) m_configuration.m_meta_space = par;
else
{
m_log.Emsg("Config", "spacenames requires two parameters: <data-space> <metadata-space>.");
return false;
}
}
else if ( part == "hdfsmode" )
{
m_configuration.m_hdfsmode = true;

const char* params = config.GetWord();
if (params) {
if (!strncmp("hdfsbsize", params, 9)) {
long long minBlSize = 128 * 1024;
long long maxBlSize = 1024 * 1024 * 1024;
const char* params = config.GetWord();
if (params)
{
if (!strncmp("hdfsbsize", params, 9))
{
long long minBlSize = 32 * 1024;
long long maxBlSize = 128 * 1024 * 1024;
params = config.GetWord();
if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", params, &m_configuration.m_hdfsbsize, minBlSize, maxBlSize))
{
return false;
}
}
else {
else
{
m_log.Emsg("Config", "Error setting the fragment size parameter name");
return false;
}
Expand All @@ -354,7 +387,7 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config )
return false;
}

assert ( config.GetWord() == 0 && "Cache::ConfigParameters() lost argument");
assert (config.GetWord() == 0 && "Cache::ConfigParameters() lost argument");

return true;
}
4 changes: 3 additions & 1 deletion src/XrdFileCache/XrdFileCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ bool File::Open()

// Create the data file itself.
char size_str[16]; sprintf(size_str, "%lld", m_fileSize);
myEnv.Put("oss.asize", size_str);
myEnv.Put("oss.asize", size_str);
myEnv.Put("oss.cgroup", Cache::GetInstance().RefConfiguration().m_data_space.c_str());
if (myOss.Create(myUser, m_temp_filename.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK)
{
TRACEF(Error, "File::Open() Create failed for data file " << m_temp_filename
Expand All @@ -243,6 +244,7 @@ bool File::Open()
bool fileExisted = (myOss.Stat(ifn.c_str(), &infoStat) == XrdOssOK);

myEnv.Put("oss.asize", "64k"); // MT-XXX Calculate? Do not know length of access lists ...
myEnv.Put("oss.cgroup", Cache::GetInstance().RefConfiguration().m_meta_space.c_str());
if (myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK)
{
TRACEF(Error, "File::Open() Create failed for info file " << ifn
Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &
m_localStat(0)
{
XrdCl::URL url(GetInput()->Path());
std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();
std::string fname = url.GetPath();

m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this);
if (m_file)
Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void IOFileBlock::GetBlockSizeFromPath()
File* IOFileBlock::newBlockFile(long long off, int blocksize)
{
XrdCl::URL url(GetInput()->Path());
std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();
std::string fname = url.GetPath();

std::stringstream ss;
ss << fname;
Expand Down

0 comments on commit 3dcee51

Please sign in to comment.