Skip to content

Commit

Permalink
Merge pull request #527 from alja/master
Browse files Browse the repository at this point in the history
pfc: change buffer and ram limits in case of client caching, add parameter for disk flush frequency
  • Loading branch information
abh3 committed Jun 7, 2017
2 parents 6f74742 + 9080d5c commit 6532504
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 18 deletions.
7 changes: 4 additions & 3 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -147,7 +147,8 @@ Cache::Cache() : XrdOucCache(),
m_trace(0),
m_traceID("Manager"),
m_prefetch_condVar(0),
m_RAMblocks_used(0)
m_RAMblocks_used(0),
m_isClient(false)
{
m_trace = new XrdOucTrace(&m_log);
// default log level is Warning
Expand All @@ -167,7 +168,7 @@ XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options)
else
cio = new IOEntireFile(io, m_stats, *this);

TRACE_PC(Info, const char* loc = io->Location(),
TRACE_PC(Debug, const char* loc = io->Location(),
"Cache::Attach() " << io->Path() << " location: " <<
((loc && loc[0] != 0) ? loc : "<deferred open>"));
return cio;
Expand Down Expand Up @@ -340,7 +341,7 @@ void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set)
{
DiskSyncer* ds = new DiskSyncer(f);
if ( ! ref_cnt_already_set) inc_ref_cnt(f, true);
if (isClient) ds->DoIt();
if (m_isClient) ds->DoIt();
else if (schedP) schedP->Schedule(ds);
else {pthread_t tid;
XrdSysThread::Run(&tid, callDoIt, ds, 0, "DiskSyncer");
Expand Down
9 changes: 6 additions & 3 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -60,7 +60,8 @@ struct Configuration
m_RamAbsAvailable(0),
m_NRamBuffers(-1),
m_prefetch_max_blocks(10),
m_hdfsbsize(128*1024*1024)
m_hdfsbsize(128*1024*1024),
m_flushCnt(100)
{}

bool m_hdfsmode; //!< flag for enabling block-level operation
Expand All @@ -79,15 +80,17 @@ struct Configuration
size_t m_prefetch_max_blocks; //!< maximum number of blocks to prefetch per file

long long m_hdfsbsize; //!< used with m_hdfsmode, default 128MB
long long m_flushCnt; //!< nuber of unsynced blcoks on disk before flush is called
};

struct TmpConfiguration
{
std::string m_diskUsageLWM;
std::string m_diskUsageHWM;
std::string m_flushRaw;

TmpConfiguration() :
m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95")
m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"), m_flushRaw("100")
{}
};

Expand Down Expand Up @@ -238,7 +241,7 @@ private:

XrdSysMutex m_RAMblock_mutex; //!< central lock for this class
int m_RAMblocks_used;
bool isClient; //!< True if running as client
bool m_isClient; //!< True if running as client

struct WriteQ
{
Expand Down
49 changes: 39 additions & 10 deletions src/XrdFileCache/XrdFileCacheConfiguration.cc
Expand Up @@ -110,7 +110,7 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char

// Indicate whether or not we are a client instance
//
isClient = (strncmp("*client ", theINS, 8) != 0);
m_isClient = (strncmp("*client ", theINS, 8) != 0);

XrdOucEnv myEnv;
XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
Expand Down Expand Up @@ -148,6 +148,11 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
return false;
}

// minimize buffersize in case of client caching
if ( m_isClient) {
m_configuration.m_bufferSize = 256 * 1024 * 124;
}


// Actual parsing of the config file.
bool retval = true;
Expand Down Expand Up @@ -221,15 +226,33 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
}
}

// sets flush frequency
{
if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
{
if (XrdOuca2x::a2sz(m_log, "Error getting number of blocks to flush", tmpc.m_flushRaw.c_str(), &m_configuration.m_flushCnt, 100*m_configuration.m_bufferSize , 5000*m_configuration.m_bufferSize))
{
return false;
}
m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
}
else
{
m_configuration.m_flushCnt = ::atol(tmpc.m_flushRaw.c_str());
}
}


// get number of available RAM blocks after process configuration
if (m_configuration.m_RamAbsAvailable == 0)
{
TRACE(Error, "RAM usage not specified. pfc.ram is a required configuration directive since release 4.6.\n"
" As a temporary measure default of 8 GB is being used. This will be discontinued in release 5.");
m_configuration.m_RamAbsAvailable = 8ll * 1024 * 1024 * 1024;
// return false;
m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024 * 1024 * 1024;
char buff2[1024];
snprintf(buff2, sizeof(buff2), "RAM usage is not specified. Default value %s is used.", m_isClient ? "256m" : "8g");
TRACE(Warning, buff2);
}
m_configuration.m_NRamBuffers = static_cast<int>(m_configuration.m_RamAbsAvailable/ m_configuration.m_bufferSize);


// Set tracing to debug if this is set in environment
char* cenv = getenv("XRDDEBUG");
Expand All @@ -246,7 +269,8 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
" pfc.ram %.fg\n"
" pfc.diskusage %lld %lld sleep %d\n"
" pfc.spaces %s %s\n"
" pfc.trace %d",
" pfc.trace %d\n"
" pfc.flush %lld",
config_filename,
m_configuration.m_bufferSize,
m_configuration.m_prefetch_max_blocks,
Expand All @@ -256,7 +280,10 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
m_configuration.m_purgeInterval,
m_configuration.m_data_space.c_str(),
m_configuration.m_meta_space.c_str(),
m_trace->What);
m_trace->What,
m_configuration.m_flushCnt);



if (m_configuration.m_hdfsmode)
{
Expand Down Expand Up @@ -361,7 +388,7 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfigur
}
else if ( part == "ram" )
{
long long minRAM = 1024 * 1024 * 1024;
long long minRAM = m_isClient ? 256 * 1024 * 1024 : 1024 * 1024 * 1024;
long long maxRAM = 256 * minRAM;
if ( XrdOuca2x::a2sz(m_log, "get RAM available", config.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
{
Expand Down Expand Up @@ -409,14 +436,16 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfigur
}
}
}
else if ( part == "flush" )
{
tmpc.m_flushRaw = config.GetWord();
}
else
{
m_log.Emsg("Cache::ConfigParameters() unmatched pfc parameter", part.c_str());
return false;
}

assert (config.GetWord() == 0 && "Cache::ConfigParameters() lost argument");

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -747,7 +747,7 @@ void File::WriteBlockToDisk(Block* b)
{
m_cfi.SetBitSynced(pfIdx);
++m_non_flushed_cnt;
if (m_non_flushed_cnt >= 100)
if (m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt)
{
schedule_sync = true;
m_in_sync = true;
Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCachePurge.cc
Expand Up @@ -97,7 +97,7 @@ void FillFileMapRecurse( XrdOssDF* iOssDF, const std::string& path, FPurgeState&
{
// cinfo file does not contain any known accesses, use stat.mtime instead.

TRACE(Warning, "FillFileMapRecurse() could not get access time for " << np << ", trying stat");
TRACE(Debug, "FillFileMapRecurse() could not get access time for " << np << ", trying stat");

XrdOss* oss = Cache::GetInstance().GetOss();
struct stat fstat;
Expand Down

0 comments on commit 6532504

Please sign in to comment.