Skip to content

Commit

Permalink
Merge pull request #373 from alja/trace
Browse files Browse the repository at this point in the history
pfc-V2: use XrdOucTrace for logging
  • Loading branch information
abh3 committed Jun 8, 2016
2 parents 5370374 + 7b8fb09 commit 8350c5e
Show file tree
Hide file tree
Showing 15 changed files with 301 additions and 203 deletions.
2 changes: 2 additions & 0 deletions src/XrdFileCache/README
Expand Up @@ -129,6 +129,8 @@ pfc.osslib <lpath> [<params>] path to alternative plign for output file system

pfc.decisionlib <lpath> [<prams>] path to decision library and plugin parameters

pfc.trace <none|error|warning|info|debug|dump> default level is none, xrootd option -d sets debug level

Examples

a) Enable proxy file prefetching:
Expand Down
36 changes: 17 additions & 19 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -28,18 +28,19 @@
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdOuc/XrdOucTrace.hh"

#include "XrdFileCache.hh"
#include "XrdFileCacheTrace.hh"
#include "XrdFileCacheInfo.hh"
#include "XrdFileCacheIOEntireFile.hh"
#include "XrdFileCacheIOFileBlock.hh"

using namespace XrdFileCache;



Cache * Cache::m_factory = NULL;


void *CacheDirCleanupThread(void* cache_void)
{
Cache::GetInstance().CacheDirCleanup();
Expand Down Expand Up @@ -131,9 +132,12 @@ bool Cache::Decide(XrdOucCacheIO* io)

Cache::Cache() : XrdOucCache(),
m_log(0, "XrdFileCache_"),
m_trace(0),
m_traceID("Manager"),
m_prefetch_condVar(0),
m_RAMblocks_used(0)
{
m_trace = new XrdOucTrace(&m_log);
}

//______________________________________________________________________________
Expand All @@ -142,18 +146,19 @@ XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options)
{
if (Cache::GetInstance().Decide(io))
{
clLog()->Info(XrdCl::AppMsg, "Cache::Attach() %s", io->Path());
TRACE(Debug, "Cache::Attach() " << io->Path());
IO* cio;
if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
cio = new IOFileBlock(io, m_stats, *this);
else
cio = new IOEntireFile(io, m_stats, *this);

TRACE(Info, "Cache::Attach() " << io->Path());
return cio;
}
else
{
clLog()->Info(XrdCl::AppMsg, "Cache::Attach() reject %s", io->Path());
TRACE(Info, "Cache::Attach() decision decline " << io->Path());
}
return io;
}
Expand All @@ -168,7 +173,7 @@ int Cache::isAttached()

void Cache::Detach(XrdOucCacheIO* io)
{
clLog()->Info(XrdCl::AppMsg, "Cache::Detach() %s", io->Path());
TRACE(Info, "Cache::Detach() " << io->Path());

// Cache owns File objects
XrdSysMutexHelper lock(&m_active_mutex);
Expand Down Expand Up @@ -196,15 +201,15 @@ Cache::HaveFreeWritingSlots()
return true;
}
else {
XrdCl::DefaultEnv::GetLog()->Info(XrdCl::AppMsg, "Cache::HaveFreeWritingSlots() negative", m_writeQ.size);
TRACE(Dump, "Cache::HaveFreeWritingSlots() negative " << m_writeQ.size);
return false;
}
}
//______________________________________________________________________________
void
Cache::AddWriteTask(Block* b, bool fromRead)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::AddWriteTask() bOff=%ld", b->m_offset);
TRACE(Dump, "Cache::AddWriteTask() bOff=%ld " << b->m_offset);
m_writeQ.condVar.Lock();
if (fromRead)
m_writeQ.queue.push_back(b);
Expand All @@ -224,10 +229,9 @@ void Cache::RemoveWriteQEntriesFor(File *iFile)
{
if ((*i)->m_file == iFile)
{

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Remove entries for %p path %s", (void*)(*i), iFile->lPath());
TRACE(Dump, "Cache::Remove entries for " << (void*)(*i) << " path " << iFile->lPath());
std::list<Block*>::iterator j = i++;
iFile->BlockRemovedFromWriteQ(*j);
iFile->BlockRemovedFromWriteQ(*j);
m_writeQ.queue.erase(j);
--m_writeQ.size;
}
Expand All @@ -253,7 +257,7 @@ Cache::ProcessWriteTasks()
Block* block = m_writeQ.queue.front(); // AMT should not be back ???
m_writeQ.queue.pop_front();
m_writeQ.size--;
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::ProcessWriteTasks for %p path %s", (void*)(block), block->m_file->lPath());
TRACE(Dump, "Cache::ProcessWriteTasks for %p " << (void*)(block) << " path " << block->m_file->lPath());
m_writeQ.condVar.UnLock();

block->m_file->WriteBlockToDisk(block);
Expand Down Expand Up @@ -328,13 +332,10 @@ Cache::RegisterPrefetchFile(File* file)

if (Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
{

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Register new file BEGIN");
m_prefetch_condVar.Lock();
m_prefetchList.push_back(file);
m_prefetch_condVar.Signal();
m_prefetch_condVar.UnLock();
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Register new file End");
}
}

Expand Down Expand Up @@ -395,7 +396,7 @@ Cache::Prepare(const char *url, int oflags, mode_t mode)
struct stat buf;
int res = m_output_fs->Stat(spath.c_str(), &buf);
if (res == 0) {
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch defer open %s", spath.c_str());
TRACE( Dump, "Cache::Prefetch defer open " << spath);
return 1;
}
else {
Expand Down Expand Up @@ -429,7 +430,7 @@ int Cache::Stat(const char *curl, struct stat &sbuff)
name += ".cinfo";
int res = infoFile->Open(name.c_str(), O_RDONLY, 0600, myEnv);
if (res >= 0) {
Info info(0);
Info info(m_trace, 0);
if (info.Read(infoFile) > 0) {
sbuff.st_size = info.GetFileSize();
success = true;
Expand All @@ -451,9 +452,6 @@ void
Cache::Prefetch()
{
int limitRAM= Cache::GetInstance().RefConfiguration().m_NRamBuffers * 0.7;

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch thread start");

while (true) {
m_RAMblock_mutex.Lock();
bool doPrefetch = (m_RAMblocks_used < limitRAM && HaveFreeWritingSlots());
Expand Down
10 changes: 7 additions & 3 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -30,6 +30,7 @@

class XrdOucStream;
class XrdSysError;
class XrdOucTrace;

namespace XrdCl {
class Log;
Expand Down Expand Up @@ -190,13 +191,19 @@ namespace XrdFileCache

void AddActive(IO*, File*);


XrdOucTrace* GetTrace() { return m_trace; }
private:
bool ConfigParameters(std::string, XrdOucStream&);
bool ConfigXeq(char *, XrdOucStream &);
bool xdlib(XrdOucStream &);
bool xtrace(XrdOucStream &);
static Cache *m_factory; //!< this object

XrdSysError m_log; //!< XrdFileCache namespace logger
XrdOucTrace* m_trace;
const char* m_traceID;

XrdOucCacheStats m_stats; //!<
XrdOss *m_output_fs; //!< disk cache file system

Expand All @@ -206,9 +213,6 @@ namespace XrdFileCache

Configuration m_configuration; //!< configurable parameters

//! Short log alias.
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }

XrdSysCondVar m_prefetch_condVar; //!< central lock for this class

XrdSysMutex m_RAMblock_mutex; //!< central lock for this class
Expand Down
68 changes: 54 additions & 14 deletions src/XrdFileCache/XrdFileCacheConfiguration.cc
@@ -1,4 +1,5 @@
#include "XrdFileCache.hh"
#include "XrdFileCacheTrace.hh"

#include "XrdOss/XrdOss.hh"
#include "XrdOss/XrdOssCache.hh"
Expand Down Expand Up @@ -33,7 +34,7 @@ bool Cache::xdlib(XrdOucStream &Config)
std::string libp;
if (!(val = Config.GetWord()) || !val[0])
{
clLog()->Info(XrdCl::AppMsg, " Cache::Config() decisionlib not specified; always caching files");
TRACE(Info," Cache::Config() decisionlib not specified; always caching files");
return true;
}
else
Expand All @@ -54,17 +55,48 @@ bool Cache::xdlib(XrdOucStream &Config)
Decision * d = ep(m_log);
if (!d)
{
clLog()->Error(XrdCl::AppMsg, "Cache::Config() decisionlib was not able to create a decision object");
TRACE(Error, "Cache::Config() decisionlib was not able to create a decision object");
return false;
}
if (params)
d->ConfigDecision(params);

m_decisionpoints.push_back(d);
clLog()->Info(XrdCl::AppMsg, "Cache::Config() successfully created decision lib from %s", libp.c_str());
return true;
}

/* Function: xtrace
Purpose: To parse the directive: trace <level>
Output: true upon success or false upon failure.
*/
bool Cache::xtrace(XrdOucStream &Config)
{
char *val;
static struct traceopts {const char *opname; int opval;} tropts[] =
{
{"none", 0},
{"error", 1},
{"warning", 2},
{"info", 3},
{"debug", 4},
{"dump", 5}
};
int numopts = sizeof(tropts)/sizeof(struct traceopts);

if (!(val = Config.GetWord()))
{m_log.Emsg("Config", "trace option not specified"); return 1;}

for (int i = 0; i < numopts; i++)
{
if (!strcmp(val, tropts[i].opname)) {
m_trace->What = tropts[i].opval;
return true;
}
}
return false;
}

//______________________________________________________________________________

bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char *parameters)
Expand All @@ -80,14 +112,14 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char

if (!config_filename || !*config_filename)
{
clLog()->Warning(XrdCl::AppMsg, "Cache::Config() configuration file not specified.");
TRACE(Error, "Cache::Config() configuration file not specified.");
return false;
}

int fd;
if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
{
clLog()->Error(XrdCl::AppMsg, "Cache::Config() can't open configuration file %s", config_filename);
TRACE( Error, "Cache::Config() can't open configuration file " << config_filename);
return false;
}

Expand All @@ -106,7 +138,7 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
}
else
{
clLog()->Error(XrdCl::AppMsg, "Cache::Config() Unable to create an OSS object");
TRACE(Error, "Cache::Config() Unable to create an OSS object");
m_output_fs = 0;
return false;
}
Expand All @@ -123,7 +155,11 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
}
else if (!strcmp(var,"pfc.decisionlib"))
{
xdlib(Config);
retval = xdlib(Config);
}
else if (!strcmp(var,"pfc.trace"))
{
retval = xtrace(Config);
}
else if (!strncmp(var,"pfc.", 4))
{
Expand All @@ -133,7 +169,7 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
if (!retval)
{
retval = false;
clLog()->Error(XrdCl::AppMsg, "Cache::Config() error in parsing");
TRACE(Error, "Cache::Config() error in parsing");
break;
}

Expand All @@ -147,18 +183,21 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
if (m_output_fs->StatVS(&sP, "public", 1) >= 0) {
m_configuration.m_diskUsageLWM = static_cast<long long>(0.90 * sP.Total + 0.5);
m_configuration.m_diskUsageHWM = static_cast<long long>(0.95 * sP.Total + 0.5);
clLog()->Debug(XrdCl::AppMsg, "Default disk usage [%lld, %lld]", m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM);
}
}

// get number of available RAM blocks after process configuration
if (m_configuration.m_RamAbsAvailable == 0 )
{
m_log.Emsg("Error", "RAM usage not specified. Please set pfc.ram value in configuration file.");
return false;
TRACE(Error, "RAM usage not specified. Please set pfc.ram value in configuration file.");
return false;
}
m_configuration.m_NRamBuffers = static_cast<int>(m_configuration.m_RamAbsAvailable/ m_configuration.m_bufferSize);

// Set tracing to debug if this is set in environment
char* cenv = getenv("XRDDEBUG");
if (cenv && !strcmp(cenv,"1")) m_trace->What = 4;

if (retval)
{
int loff = 0;
Expand All @@ -167,12 +206,13 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
" pfc.blocksize %lld\n"
" pfc.prefetch %ld\n"
" pfc.ram %.fg",
" pfc.ram %.fg"
" pfc.trace %d",
config_filename,
m_configuration.m_bufferSize,
m_configuration.m_prefetch_max_blocks, // AMT not sure what parsing should be
rg);

rg,
m_trace->What);

if (m_configuration.m_hdfsmode)
{
Expand Down

0 comments on commit 8350c5e

Please sign in to comment.