Skip to content

Commit

Permalink
*[Proxy]* Make cache I/O synchronization tunable.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Jun 28, 2017
1 parent f8d398a commit 847c8be
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 167 deletions.
45 changes: 45 additions & 0 deletions src/XrdOuc/XrdOucPsx.cc
Expand Up @@ -285,6 +285,7 @@ bool XrdOucPsx::Parse(char *var, XrdOucStream &Config, XrdSysError &eDest)
TS_Xeq("memcache", ParseCache); // Backward compatibility
TS_Xeq("cache", ParseCache);
TS_Xeq("cachelib", ParseCLib);
TS_Xeq("ciosync", ParseCio);
TS_Xeq("inetmode", ParseINet);
TS_Xeq("namelib", ParseNLib);
TS_Xeq("setopt", ParseSet);
Expand Down Expand Up @@ -458,6 +459,50 @@ char *XrdOucPsx::ParseCache(XrdSysError *Eroute, XrdOucStream &Config, char *pBu
return val;
}

/******************************************************************************/
/* P a r s e C i o */
/******************************************************************************/

/* Function: ParseCio
Purpose: To parse the directive: ciosync <tsec> <tries>
<tsec> the number of seconds between each sync attempt.
<tries> the maximum number of tries before giving up.
Output: true upon success or false upon failure.
*/

bool XrdOucPsx::ParseCio(XrdSysError *Eroute, XrdOucStream &Config)
{
char *val;
int tsec, mtry;

// Get the try seconds
//
if (!(val = Config.GetWord()) || !val[0])
{Eroute->Emsg("Config", "ciosync parameter not specified"); return false;}

// Convert to seconds
//
if (XrdOuca2x::a2i(*Eroute,"ciosync interval",val,&tsec,10)) return false;

// Get the max seconds
//
if (!(val = Config.GetWord()) || !val[0])
{Eroute->Emsg("Config", "max time not specified"); return false;}

// Convert to seconds
//
if (XrdOuca2x::a2i(*Eroute,"ciosync max time",val,&mtry,2)) return false;

// Set values and return success
//
cioWait = tsec;
cioTries = mtry;
return true;
}

/******************************************************************************/
/* P a r s e C L i b */
/******************************************************************************/
Expand Down
6 changes: 5 additions & 1 deletion src/XrdOuc/XrdOucPsx.hh
Expand Up @@ -52,6 +52,8 @@ bool ConfigSetup(XrdSysError &eDest, bool hush=false);

bool ParseCache(XrdSysError *Eroute, XrdOucStream &Config);

bool ParseCio(XrdSysError *Eroute, XrdOucStream &Config);

bool ParseCLib(XrdSysError *Eroute, XrdOucStream &Config);

bool ParseINet(XrdSysError *Eroute, XrdOucStream &Config);
Expand All @@ -73,14 +75,16 @@ XrdOucTList *setLast;
int maxRHCB;
int traceLvl;
int debugLvl;
int cioWait;
int cioTries;
bool useV4;
bool xLfn2Pfn;
bool xPfn2Lfn;

XrdOucPsx(XrdVersionInfo *vInfo, const char *cfn)
: theN2N(0), theCache(0), theCache2(0), mCache(0),
setFirst(0), setLast(0), maxRHCB(0),
traceLvl(0), debugLvl(0),
traceLvl(0), debugLvl(0), cioWait(0), cioTries(0),
useV4(false), xLfn2Pfn(false), xPfn2Lfn(false),
LocalRoot(0), RemotRoot(0), N2NLib(0), N2NParms(0),
cPath(0), cParm(0), configFN(strdup(cfn)),
Expand Down
9 changes: 9 additions & 0 deletions src/XrdPosix/XrdPosixConfig.cc
Expand Up @@ -64,6 +64,8 @@ extern XrdOucName2Name *theN2N;
extern XrdCl::DirListFlags::Flags dlFlag;
extern XrdSysLogger *theLogger;
extern XrdSysTrace Trace;
extern int ddInterval;
extern int ddMaxTries;
extern bool oidsOK;
};

Expand Down Expand Up @@ -243,6 +245,13 @@ void XrdPosixConfig::SetConfig(XrdOucPsx &parms)
//
if (parms.maxRHCB > 0) XrdPosixFileRH::SetMax(parms.maxRHCB);

// Set delayed destro parameters if present
//
if (parms.cioWait > 0 && parms.cioTries > 0)
{XrdPosixGlobals::ddMaxTries = (parms.cioTries < 2 ? 2 : parms.cioTries);
XrdPosixGlobals::ddInterval = (parms.cioWait < 10 ? 10 : parms.cioWait);
}

// Handle the caching options
//
if (parms.theCache2)
Expand Down
17 changes: 9 additions & 8 deletions src/XrdPosix/XrdPosixFile.cc
Expand Up @@ -54,6 +54,9 @@ namespace XrdPosixGlobals
{
extern XrdOucCache2 *theCache;
extern XrdOucName2Name *theN2N;
extern int ddInterval;
extern int ddMaxTries;
int ddNumLost = 0;
};

namespace
Expand Down Expand Up @@ -146,9 +149,6 @@ void* XrdPosixFile::DelayedDestroy(void* vpf)
// file cannot be closed in a clean fashion for some reason.
//
EPNAME("DDestroy");
static const int ddInterval = 30;
static const int maxTries = (3*60)/ddInterval;
static int numLost = 0;

XrdCl::XRootDStatus Status;
const char *eTxt;
Expand All @@ -159,7 +159,7 @@ void* XrdPosixFile::DelayedDestroy(void* vpf)
// Wait for active I/O to complete
//
do{if (doWait)
{XrdSysTimer::Snooze(ddInterval);
{XrdSysTimer::Snooze(XrdPosixGlobals::ddInterval);
doWait = false;
} else {
ddSem.Wait();
Expand All @@ -175,7 +175,8 @@ do{if (doWait)

// Do some debugging
//
DEBUG("DLY destory of "<<ddCount<<" objects; "<<numLost<<" already lost.");
DEBUG("DLY destory of "<<ddCount<<" objects; "<<XrdPosixGlobals::ddNumLost
<<" already lost.");

// Try to delete all the files on the list. If we exceeded the try limit,
// remove the file from the list and let it sit forever.
Expand All @@ -187,10 +188,10 @@ do{if (doWait)
else eTxt = Status.ToString().c_str();
} else eTxt = (ioActive ? "active I/O" : "callback");

if (fCurr->numTries > maxTries)
{numLost++; ddCount--;
if (fCurr->numTries > XrdPosixGlobals::ddMaxTries)
{XrdPosixGlobals::ddNumLost++; ddCount--;
DMSG("DDestroy", eTxt <<" timeout closing " <<fCurr->Origin()
<<numLost <<" objects lost");
<<' ' <<XrdPosixGlobals::ddNumLost <<" objects lost");
fCurr->nextFile = ddLost;
ddLost = fCurr;
fCurr->Close(Status);
Expand Down
2 changes: 2 additions & 0 deletions src/XrdPosix/XrdPosixXrootd.cc
Expand Up @@ -86,6 +86,8 @@ XrdCl::DirListFlags::Flags dlFlag = XrdCl::DirListFlags::None;
XrdSysLogger *theLogger = 0;
XrdSysTrace Trace("Posix", 0,
(getenv("XRDPOSIX_DEBUG") ? TRACE_Debug : 0));
int ddInterval= 30;
int ddMaxTries= 180/30;
bool oidsOK = false;
};

Expand Down
6 changes: 0 additions & 6 deletions src/XrdPss/XrdPss.hh
Expand Up @@ -202,16 +202,10 @@ int ConfigProc(const char *ConfigFN);
int ConfigXeq(char*, XrdOucStream&);
const
char *getDomain(const char *hName);
int xcach(XrdSysError *Eroute, XrdOucStream &Config);
int xcacl(XrdSysError *Eroute, XrdOucStream &Config);
int xconf(XrdSysError *Eroute, XrdOucStream &Config);
int xdef( XrdSysError *Eroute, XrdOucStream &Config);
int xexp( XrdSysError *Eroute, XrdOucStream &Config);
int xinet(XrdSysError *errp, XrdOucStream &Config);
int xperm(XrdSysError *errp, XrdOucStream &Config);
int xorig(XrdSysError *errp, XrdOucStream &Config);
int xsopt(XrdSysError *Eroute, XrdOucStream &Config);
int xtrac(XrdSysError *Eroute, XrdOucStream &Config);
int xnml (XrdSysError *Eroute, XrdOucStream &Config);
};
#endif
167 changes: 15 additions & 152 deletions src/XrdPss/XrdPssConfig.cc
Expand Up @@ -80,6 +80,9 @@

#define TS_Xeq(x,m) if (!strcmp(x,var)) return m(&eDest, Config);

#define TS_PSX(x,m) if (!strcmp(x,var)) \
return (psxConfig->m(&eDest, Config) ? 0 : 1);

/*******x**********************************************************************/
/* G l o b a l s */
/******************************************************************************/
Expand Down Expand Up @@ -436,18 +439,19 @@ int XrdPssSys::ConfigXeq(char *var, XrdOucStream &Config)

// Process items. for either a local or a remote configuration
//
TS_Xeq("memcache", xcach); // Backward compatibility
TS_Xeq("cache", xcach);
TS_Xeq("cachelib", xcacl);
TS_PSX("namelib", ParseNLib);
TS_PSX("memcache", ParseCache); // Backward compatibility
TS_PSX("cache", ParseCache);
TS_PSX("cachelib", ParseCLib);
TS_PSX("ciosync", ParseCio);
TS_Xeq("config", xconf);
TS_Xeq("inetmode", xinet);
TS_Xeq("origin", xorig);
TS_Xeq("permit", xperm);
TS_Xeq("setopt", xsopt);
TS_Xeq("trace", xtrac);
TS_Xeq("namelib", xnml);
TS_Xeq("defaults", xdef);
TS_Xeq("export", xexp);
TS_PSX("inetmode", ParseINet);
TS_Xeq("origin", xorig);
TS_Xeq("permit", xperm);
TS_PSX("setopt", ParseSet);
TS_PSX("trace", ParseTrace);

// Copy the variable name as this may change because it points to an
// internal buffer in Config. The vagaries of effeciency. Then get value.
Expand Down Expand Up @@ -481,59 +485,6 @@ const char *XrdPssSys::getDomain(const char *hName)
return hName;
}

/******************************************************************************/
/* x c a c h */
/******************************************************************************/

/* Function: xcach
Purpose: To parse the directive: cache <keyword> <value> [...]
<keyword> is one of the following:
debug {0 | 1 | 2}
logstats enables stats logging
max2cache largest read to cache (can be suffixed with k, m, g).
minpages smallest number of pages allowed (default 256)
mode {r | w}
pagesize size of each cache page (can be suffixed with k, m, g).
preread [minpages [minrdsz]] [perf nn [recalc]]
r/w enables caching for files opened read/write.
sfiles {on | off | .<sfx>}
size size of cache in bytes (can be suffixed with k, m, g).
Output: 0 upon success or 1 upon failure.
*/

int XrdPssSys::xcach(XrdSysError *Eroute, XrdOucStream &Config)
{

// We parse this using the configurator
//
return (psxConfig->ParseCache(Eroute, Config) ? 0 : 1);
}

/******************************************************************************/
/* x c a c l */
/******************************************************************************/

/* Function: xcacl
Purpose: To parse the directive: cachelib {<path>|default} [<parms>]
<path> the path of the cache library to be used.
<parms> optional parms to be passed
Output: 0 upon success or !0 upon failure.
*/

int XrdPssSys::xcacl(XrdSysError *Eroute, XrdOucStream &Config)
{

// We parse this using the configurator
//
return (psxConfig->ParseCLib(Eroute, Config) ? 0 : 1);
}

/******************************************************************************/
/* x c o n f */
/******************************************************************************/
Expand All @@ -543,7 +494,8 @@ int XrdPssSys::xcacl(XrdSysError *Eroute, XrdOucStream &Config)
Purpose: To parse the directive: config <keyword> <value>
<keyword> is one of the following:
workers number of queue workers > 0
streams number of i/o streams
workers number of queue workers
Output: 0 upon success or 1 upon failure.
*/
Expand Down Expand Up @@ -631,51 +583,6 @@ int XrdPssSys::xexp(XrdSysError *Eroute, XrdOucStream &Config)
if (*(pP->Path()) == '*') XrdPosixConfig::setOids(true);
return 0;
}

/******************************************************************************/
/* x i n e t */
/******************************************************************************/

/* Function: xinet
Purpose: To parse the directive: inetmode v4 | v6
v4 use only IPV4 addresses to connect to servers.
v6 use IPV4 mapped addresses or IPV6 addresses, as needed.
Output: 0 upon success or !0 upon failure.
*/

int XrdPssSys::xinet(XrdSysError *Eroute, XrdOucStream &Config)
{

// We parse this using the configurator
//
return (psxConfig->ParseINet(Eroute, Config) ? 0 : 1);
}

/******************************************************************************/
/* x n m l */
/******************************************************************************/

/* Function: xnml
Purpose: To parse the directive: namelib [<opts>] pfn<path> [<parms>]
<opts> one or more: [-lfn2pfn] [-lfncache]
<path> the path of the filesystem library to be used.
<parms> optional parms to be passed
Output: 0 upon success or !0 upon failure.
*/

int XrdPssSys::xnml(XrdSysError *Eroute, XrdOucStream &Config)
{

// We parse this using the configurator
//
return (psxConfig->ParseNLib(Eroute, Config) ? 0 : 1);
}

/******************************************************************************/
/* x o r i g */
Expand Down Expand Up @@ -798,47 +705,3 @@ do {if (!(val = Config.GetWord()))

return 0;
}

/******************************************************************************/
/* x s o p t */
/******************************************************************************/

/* Function: xsopt
Purpose: To parse the directive: setopt <keyword> <value>
<keyword> is an XrdClient option keyword.
<value> is the value the option is to have.
Output: 0 upon success or !0 upon failure.
*/

int XrdPssSys::xsopt(XrdSysError *Eroute, XrdOucStream &Config)
{

// We parse this using the configurator
//
return (psxConfig->ParseSet(Eroute, Config) ? 0 : 1);
}

/******************************************************************************/
/* x t r a c e */
/******************************************************************************/

/* Function: xtrace
Purpose: To parse the directive: trace <events>
<events> the blank separated list of events to trace. Trace
directives are cummalative.
Output: retc upon success or -EINVAL upon failure.
*/

int XrdPssSys::xtrac(XrdSysError *Eroute, XrdOucStream &Config)
{

// We parse this using the configurator
//
return (psxConfig->ParseTrace(Eroute, Config) ? 0 : 1);
}

0 comments on commit 847c8be

Please sign in to comment.