From 5d2ba41196fc83547474b775c4eedfc823a13fbe Mon Sep 17 00:00:00 2001 From: Andrew Hanushevsky Date: Mon, 6 May 2019 21:18:10 -0700 Subject: [PATCH] [Server] Implement simple g-stream monitoring for medium level repotring. --- src/XrdFfs/XrdFfsMisc.cc | 4 +- src/XrdHeaders.cmake | 4 + src/XrdOss/XrdOss.hh | 31 ++++ src/XrdOss/XrdOssApi.cc | 29 ++- src/XrdOuc/XrdOucCache.hh | 8 +- src/XrdOuc/XrdOucCache2.hh | 12 +- src/XrdOuc/XrdOucCacheData.cc | 4 +- src/XrdOuc/XrdOucCacheStats.hh | 8 +- src/XrdOuc/XrdOucPsx.cc | 16 +- src/XrdOuc/XrdOucPsx.hh | 5 +- src/XrdPosix/XrdPosixXrootd.cc | 103 ----------- src/XrdPosix/XrdPosixXrootd.hh | 14 -- src/XrdPss/XrdPss.cc | 20 +- src/XrdPss/XrdPssConfig.cc | 10 +- src/XrdServer.cmake | 2 + src/XrdXrootd/XrdXrootdConfig.cc | 53 +++++- src/XrdXrootd/XrdXrootdGSReal.cc | 286 +++++++++++++++++++++++++++++ src/XrdXrootd/XrdXrootdGSReal.hh | 103 +++++++++++ src/XrdXrootd/XrdXrootdGStream.cc | 70 +++++++ src/XrdXrootd/XrdXrootdGStream.hh | 154 ++++++++++++++++ src/XrdXrootd/XrdXrootdMonData.hh | 15 ++ src/XrdXrootd/XrdXrootdMonFile.cc | 22 +-- src/XrdXrootd/XrdXrootdMonFile.hh | 6 +- src/XrdXrootd/XrdXrootdMonitor.cc | 19 +- src/XrdXrootd/XrdXrootdMonitor.hh | 13 +- src/XrdXrootd/XrdXrootdProtocol.hh | 1 + 26 files changed, 812 insertions(+), 200 deletions(-) create mode 100644 src/XrdXrootd/XrdXrootdGSReal.cc create mode 100644 src/XrdXrootd/XrdXrootdGSReal.hh create mode 100644 src/XrdXrootd/XrdXrootdGStream.cc create mode 100644 src/XrdXrootd/XrdXrootdGStream.hh diff --git a/src/XrdFfs/XrdFfsMisc.cc b/src/XrdFfs/XrdFfsMisc.cc index 3c89285476f..4306763b69a 100644 --- a/src/XrdFfs/XrdFfsMisc.cc +++ b/src/XrdFfs/XrdFfsMisc.cc @@ -52,7 +52,7 @@ #include "XrdFfs/XrdFfsMisc.hh" #include "XrdFfs/XrdFfsPosix.hh" #include "XrdFfs/XrdFfsQueue.hh" -#include "XrdPosix/XrdPosixXrootd.hh" +#include "XrdPosix/XrdPosixConfig.hh" #ifdef __cplusplus extern "C" { @@ -305,7 +305,7 @@ void XrdFfsMisc_xrd_init(const char *rdrurl, const char *urlcachelife, int start // EnvPutInt(NAME_READAHEADSIZE,0); // EnvPutInt(NAME_READCACHESIZE,0); // EnvPutInt(NAME_REQUESTTIMEOUT, 30); - XrdPosixXrootd::setEnv("WorkerThreads", 50); + XrdPosixConfig::SetEnv("WorkerThreads", 50); if (getenv("XROOTDFS_SECMOD") != NULL && !strcmp(getenv("XROOTDFS_SECMOD"), "sss")) XrdFfsMisc_xrd_secsss_init(); diff --git a/src/XrdHeaders.cmake b/src/XrdHeaders.cmake index eb96f402102..66aaa22f4be 100644 --- a/src/XrdHeaders.cmake +++ b/src/XrdHeaders.cmake @@ -70,6 +70,10 @@ set( XROOTD_PUBLIC_HEADERS XrdSys/XrdSysXAttr.hh XrdSys/XrdSysXSLock.hh XrdXml/XrdXmlReader.hh + XrdXrootd/XrdXrootdMonData.hh + XrdXrootd/XrdXrootdGStream.hh + XrdXrootd/XrdXrootdBridge.hh + XrdHttp/XrdHttpSecXtractor.hh ) if( NOT XRDCL_ONLY ) diff --git a/src/XrdOss/XrdOss.hh b/src/XrdOss/XrdOss.hh index 9d2a41d9d0c..35dec0135c8 100644 --- a/src/XrdOss/XrdOss.hh +++ b/src/XrdOss/XrdOss.hh @@ -246,6 +246,8 @@ virtual ~XrdOss() {} //! @param parms -> Any parameters specified after the path on the //! ofs.osslib directive. If there are no parameters, the //! pointer may be zero. +//! @param envP -> **Version2 Only** pointer to environmental info. +//! This pointer may be nil if no such information exists. //! //! @return Success: -> an instance of the XrdOss object to be used as the //! underlying storage system. @@ -254,11 +256,40 @@ virtual ~XrdOss() {} //! The object creation function must be declared as an extern "C" function //! in the plug-in shared library as follows: //------------------------------------------------------------------------------ + +//------------------------------------------------------------------------------ +//! The typedef that describes the XRdOssStatInfoInit external. +//------------------------------------------------------------------------------ + +typedef XrdOss *(*XrdOssGetStorageSystem_t) (XrdOss *native_oss, + XrdSysLogger *Logger, + const char *config_fn, + const char *parms); + +typedef XrdOss *(*XrdOssGetStorageSystem2_t)(XrdOss *native_oss, + XrdSysLogger *Logger, + const char *config_fn, + const char *parms, + XrdOucEnv *envP); + /*! extern "C" XrdOss *XrdOssGetStorageSystem(XrdOss *native_oss, XrdSysLogger *Logger, const char *config_fn, const char *parms); + + An alternate entry point may be defined in lieu of the previous entry point. + The plug-in loaer looks for this entry point first before reverting to the + older version 1 entry point/ Version 2 differs in that an extra parameter, + the environmental pointer, is passed. Note that this pointer is also + supplied via the EnvInfo() method. This, many times, is not workable as + environmental information is needed as initialization time. + + extern "C" XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, + XrdSysLogger *Logger, + const char *config_fn, + const char *parms, + XrdOucEnv *envP); */ //------------------------------------------------------------------------------ diff --git a/src/XrdOss/XrdOssApi.cc b/src/XrdOss/XrdOssApi.cc index 0a6b30a3d52..923e366a2d9 100644 --- a/src/XrdOss/XrdOssApi.cc +++ b/src/XrdOss/XrdOssApi.cc @@ -104,7 +104,6 @@ XrdOss *XrdOssGetSS(XrdSysLogger *Logger, const char *config_fn, extern XrdSysError OssEroute; XrdOucPinLoader *myLib; XrdOss *ossP; - XrdOss *(*ep)(XrdOss *, XrdSysLogger *, const char *, const char *); // Verify that versions are compatible. // @@ -123,16 +122,28 @@ XrdOss *XrdOssGetSS(XrdSysLogger *Logger, const char *config_fn, OssEroute.logger(Logger); if (!(myLib = new XrdOucPinLoader(&OssEroute, myOssSys.myVersion, "osslib", OssLib))) return 0; +// Declare the interface versions +// + XrdOssGetStorageSystem_t getOSS1; + const char *epName1 = "XrdOssGetStorageSystem"; + XrdOssGetStorageSystem2_t getOSS2; + const char *epName2 = "XrdOssGetStorageSystem2"; + +// First try finding version 2 of the initializer. If that fails try version 1. +// In the process, we will get an oss object if we succeed at all. +// + getOSS2 = (XrdOssGetStorageSystem2_t)myLib->Resolve(epName2); + if (getOSS2) ossP = getOSS2((XrdOss *)&myOssSys, Logger, config_fn, + OssParms, envP); + else {getOSS1 = (XrdOssGetStorageSystem_t)myLib->Resolve(epName1); + if (!getOSS1) return 0; + ossP = getOSS1((XrdOss *)&myOssSys, Logger, config_fn, OssParms); + } -// Now get the entry point of the object creator -// - ep = (XrdOss *(*)(XrdOss *, XrdSysLogger *, const char *, const char *)) - (myLib->Resolve("XrdOssGetStorageSystem")); - if (!ep) return 0; - -// Get the Object now +// Call the legacy EnvInfo() method and set what library we are using if it +// differs from what we wre passed. // - if ((ossP = ep((XrdOss *)&myOssSys, Logger, config_fn, OssParms)) && envP) + if (ossP && envP) {ossP->EnvInfo(envP); if (envP && strcmp(OssLib, myLib->Path())) envP->Put("oss.lib", myLib->Path()); diff --git a/src/XrdOuc/XrdOucCache.hh b/src/XrdOuc/XrdOucCache.hh index 2cd37149d82..287b1833024 100644 --- a/src/XrdOuc/XrdOucCache.hh +++ b/src/XrdOuc/XrdOucCache.hh @@ -279,6 +279,8 @@ virtual ~XrdOucCacheIO() {} // Always use Detach() instead of direct delete! such associations. */ +class XrdOucEnv; + class XrdOucCache { public: @@ -405,7 +407,8 @@ virtual ~XrdOucCache() {} { XrdOucCache *XrdOucGetCache(XrdSysLogger *Logger, // Where messages go const char *Config, // Config file used - const char *Parms); // Optional parm string + const char *Parms, // Optional parm string + XrdOucEnv *envP); // Optional environment } When Logger is null, you should use cerr to output messages. Otherwise, @@ -419,4 +422,7 @@ virtual ~XrdOucCache() {} a null pointer otherwise. The instance is used to create actual caches using the object's Create() method. */ + +typedef XrdOucCache *(*XrdOucCache_t)(XrdSysLogger *, const char *, + const char *, XrdOucEnv *); #endif diff --git a/src/XrdOuc/XrdOucCache2.hh b/src/XrdOuc/XrdOucCache2.hh index 5a368397ca4..c37b4310c72 100644 --- a/src/XrdOuc/XrdOucCache2.hh +++ b/src/XrdOuc/XrdOucCache2.hh @@ -368,6 +368,10 @@ virtual ~XrdOucCache2() {} //! is no configuration file is present. //! @param Parms Pointer to any parameters specified after the shared library //! path. If Parms is null, there are no parameters. +//! @param envP Pointer to environmental information. The most relevant +//! is whether or not -stream monitoring is enabled. +//! XrdXrootdGStream *gstm = envP->(XrddXrootdGStream *) +//! GetPtr("pfc.gStream*"); //! @return A usable, fully configured, instance of an XrdOucCache2 //! object upon success and a null pointer otherwise. This //! instance is used for all operations defined by methods in @@ -377,6 +381,10 @@ virtual ~XrdOucCache2() {} //! { //! XrdOucCache2 *XrdOucGetCache2(XrdSysLogger *Logger, // Where messages go //! const char *Config, // Config file used -//! const char *Parms); // Optional parm string -//! } +//! const char *Parms, // Optional parm string +//! } XrdOucEnv *envP); // Optional environment + +typedef XrdOucCache2 *(*XrdOucCache2_t)(XrdSysLogger *, const char *, + const char *, XrdOucEnv *); + #endif diff --git a/src/XrdOuc/XrdOucCacheData.cc b/src/XrdOuc/XrdOucCacheData.cc index b5e82e81430..1d839cbb534 100644 --- a/src/XrdOuc/XrdOucCacheData.cc +++ b/src/XrdOuc/XrdOucCacheData.cc @@ -152,8 +152,8 @@ XrdOucCacheIO *XrdOucCacheData::Detach() {char sBuff[4096]; snprintf(sBuff, sizeof(sBuff), "Cache: Stats: %lld Read; %lld Get; %lld Pass; " - "%lld Write; %lld Put; %d Hits; %d Miss; " - "%lld pead; %d HitsPR; %d MissPR; Path %s\n", + "%lld Write; %lld Put; %lld Hits; %lld Miss; " + "%lld pead; %lld HitsPR; %lld MissPR; Path %s\n", Statistics.BytesRead, Statistics.BytesGet, Statistics.BytesPass, Statistics.BytesWrite, Statistics.BytesPut, diff --git a/src/XrdOuc/XrdOucCacheStats.hh b/src/XrdOuc/XrdOucCacheStats.hh index d1a047ee046..1725ebae936 100644 --- a/src/XrdOuc/XrdOucCacheStats.hh +++ b/src/XrdOuc/XrdOucCacheStats.hh @@ -46,10 +46,10 @@ long long BytesGet; // Number of bytes delivered from the cache long long BytesPass; // Number of bytes read but not cached long long BytesWrite; // Total number of bytes written from the cache long long BytesPut; // Number of bytes updated in the cache -int Hits; // Number of times wanted data was in the cache -int Miss; // Number of times wanted data was *not* in the cache -int HitsPR; // Number of pages wanted data was just preread -int MissPR; // Number of pages wanted data was just read +long long Hits; // Number of times wanted data was in the cache +long long Miss; // Number of times wanted data was *not* in the cache +long long HitsPR; // Number of pages wanted data was just preread +long long MissPR; // Number of pages wanted data was just read inline void Get(XrdOucCacheStats &Dst) {sMutex.Lock(); diff --git a/src/XrdOuc/XrdOucPsx.cc b/src/XrdOuc/XrdOucPsx.cc index 98e16fd75bd..4bed88f9024 100644 --- a/src/XrdOuc/XrdOucPsx.cc +++ b/src/XrdOuc/XrdOucPsx.cc @@ -185,22 +185,14 @@ bool XrdOucPsx::ConfigCache(XrdSysError &eDest) // Get the Object now // if (isCache2) - {XrdOucCache2 *(*ep)(XrdSysLogger *, const char *, const char *); - ep = (XrdOucCache2 *(*)(XrdSysLogger *, const char *, const char *)) - (myLib.Resolve(cName)); - + {XrdOucCache2_t ep = (XrdOucCache2_t)myLib.Resolve(cName); if (!ep) return false; - - theCache2 = (XrdOucCache2*)ep(eDest.logger(), configFN, cParm); + theCache2 = (XrdOucCache2*)ep(eDest.logger(), configFN, cParm, theEnv); return theCache2 != 0; } else { - XrdOucCache *(*ep)(XrdSysLogger *, const char *, const char *); - ep = (XrdOucCache *(*)(XrdSysLogger *, const char *, const char *)) - (myLib.Resolve(cName)); - + XrdOucCache_t ep = (XrdOucCache_t)myLib.Resolve(cName); if (!ep) return false; - - theCache = (XrdOucCache*)ep(eDest.logger(), configFN, cParm); + theCache = (XrdOucCache*)ep(eDest.logger(), configFN, cParm, theEnv); return theCache != 0; } } diff --git a/src/XrdOuc/XrdOucPsx.hh b/src/XrdOuc/XrdOucPsx.hh index 6ee7c141778..72bc7caca33 100644 --- a/src/XrdOuc/XrdOucPsx.hh +++ b/src/XrdOuc/XrdOucPsx.hh @@ -94,8 +94,9 @@ bool xLfn2Pfn; bool xPfn2Lfn; bool xNameLib; - XrdOucPsx(XrdVersionInfo *vInfo, const char *cfn, XrdSysLogger *lp=0) - : configFN(strdup(cfn)), theLogger(lp), theEnv(0), + XrdOucPsx(XrdVersionInfo *vInfo, const char *cfn, + XrdSysLogger *lp=0, XrdOucEnv *vp=0) + : configFN(strdup(cfn)), theLogger(lp), theEnv(vp), theN2N(0), theCache(0), theCache2(0), initCCM(0), mCache(0), setFirst(0), setLast(0), maxRHCB(0), traceLvl(0), debugLvl(0), cioWait(0), cioTries(0), diff --git a/src/XrdPosix/XrdPosixXrootd.cc b/src/XrdPosix/XrdPosixXrootd.cc index b38c1cef344..39f5597d00b 100644 --- a/src/XrdPosix/XrdPosixXrootd.cc +++ b/src/XrdPosix/XrdPosixXrootd.cc @@ -1451,109 +1451,6 @@ long long XrdPosixXrootd::QueryOpaque(const char *path, char *value, int size) // return admin.Query(XrdCl::QueryCode::OpaqueFile, value, size); } - -/******************************************************************************/ -/* Obsolete! s e t C a c h e */ -/******************************************************************************/ - -void XrdPosixXrootd::setCache(XrdOucCache *cP) {XrdPosixGlobals::myCache =cP;} - -void XrdPosixXrootd::setCache(XrdOucCache2 *cP) {XrdPosixGlobals::myCache2=cP;} - -/******************************************************************************/ -/* Obsolete! s e t D e b u g */ -/******************************************************************************/ - -void XrdPosixXrootd::setDebug(int val, bool doDebug) -{ - const std::string dbgType[] = {"Info", "Warning", "Error", "Debug", "Dump"}; - -// The default is none but once set it cannot be unset in the client -// - if (val > 0) - {if (doDebug) val = 4; - else if (val > 5) val = 5; - XrdCl::DefaultEnv::SetLogLevel(dbgType[val-1]); - } - -// Now set the internal one which can be toggled -// - XrdPosixMap::SetDebug(val > 0); -} - -/******************************************************************************/ -/* Obsolete! s e t E n v */ -/******************************************************************************/ - -void XrdPosixXrootd::setEnv(const char *kword, int kval) -{ - XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv(); - static bool dlfSet = false; - -// Check for internal envars before setting the external one -// - if (!strcmp(kword, "DirlistAll")) - {XrdPosixGlobals::dlFlag = (kval ? XrdCl::DirListFlags::Locate - : XrdCl::DirListFlags::None); - dlfSet = true; - } - else if (!strcmp(kword, "DirlistDflt")) - {if (!dlfSet) - XrdPosixGlobals::dlFlag = (kval ? XrdCl::DirListFlags::Locate - : XrdCl::DirListFlags::None); - } - else env->PutInt((std::string)kword, kval); -} - -/******************************************************************************/ -/* Obsolete! s e t I P V 4 */ -/******************************************************************************/ - -void XrdPosixXrootd::setIPV4(bool usev4) -{ - const char *ipmode = (usev4 ? "IPv4" : "IPAll"); - XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv(); - -// Set the env value -// - env->PutString((std::string)"NetworkStack", (const std::string)ipmode); -} - -/******************************************************************************/ -/* Obsolete! s e t L o g g e r */ -/******************************************************************************/ - -void XrdPosixXrootd::setLogger(XrdSysLogger *logP) -{ - XrdPosixGlobals::Trace.SetLogger(logP); -} - -/******************************************************************************/ -/* Obsolete! s e t N u m C B */ -/******************************************************************************/ - -void XrdPosixXrootd::setNumCB(int numcb) -{ - if (numcb >= 0) XrdPosixFileRH::SetMax(numcb); -} - -/******************************************************************************/ -/* Obsolete! S e t N 2 N */ -/******************************************************************************/ - -void XrdPosixXrootd::setN2N(XrdOucName2Name *pN2N, int opts) -{ - XrdPosixGlobals::theN2N = pN2N; -} - -/******************************************************************************/ -/* Obsolete! s e t S c h e d */ -/******************************************************************************/ - -void XrdPosixXrootd::setSched(XrdScheduler *sP) -{ - XrdPosixGlobals::schedP = sP; -} /******************************************************************************/ /* P r i v a t e M e t h o d s */ diff --git a/src/XrdPosix/XrdPosixXrootd.hh b/src/XrdPosix/XrdPosixXrootd.hh index fda7d21149c..970e9331504 100644 --- a/src/XrdPosix/XrdPosixXrootd.hh +++ b/src/XrdPosix/XrdPosixXrootd.hh @@ -373,20 +373,6 @@ static bool myFD(int fd); XrdPosixXrootd(int maxfd=255, int maxdir=0, int maxthr=0); ~XrdPosixXrootd(); -// The following methods were always considered private. They are no longer -// used and will be removed on the next major release! They are only here for -// now to keep ABI compatability for the 4.x and prior releases. -// -static void setCache(XrdOucCache *cP); -static void setCache(XrdOucCache2 *cP); -static void setDebug(int val, bool doDebug=false); -static void setEnv(const char *kword, int kval); -static void setIPV4(bool userv4); -static void setLogger(XrdSysLogger *logP); -static void setNumCB(int numcb); -static void setN2N(XrdOucName2Name *pN2N, int opts=0); -static void setSched(XrdScheduler *sP); - private: static int Fault(XrdPosixFile *fp, int ecode); diff --git a/src/XrdPss/XrdPss.cc b/src/XrdPss/XrdPss.cc index e85357fc817..41da7f5bd18 100644 --- a/src/XrdPss/XrdPss.cc +++ b/src/XrdPss/XrdPss.cc @@ -92,6 +92,8 @@ static XrdPssSys XrdProxySS; XrdOucSid *sidP = 0; + XrdOucEnv *envP = 0; + static const char *ofslclCGI = "ofs.lcl=1"; static const char *osslclCGI = "oss.lcl=1"; @@ -114,15 +116,17 @@ XrdVERSIONINFO(XrdOssGetStorageSystem,XrdPss); // extern "C" { -XrdOss *XrdOssGetStorageSystem(XrdOss *native_oss, - XrdSysLogger *Logger, - const char *config_fn, - const char *parms) +XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, + XrdSysLogger *Logger, + const char *cFN, + const char *parms, + XrdOucEnv *envp) { // Ignore the parms (we accept none for now) and call the init routine // - return (XrdProxySS.Init(Logger, config_fn) ? 0 : (XrdOss *)&XrdProxySS); + envP = envp; + return (XrdProxySS.Init(Logger, cFN) ? 0 : (XrdOss *)&XrdProxySS); } } @@ -148,7 +152,7 @@ XrdPssSys::XrdPssSys() : LocalRoot(0), theN2N(0), DirFlags(0), Output: Returns zero upon success otherwise (-errno). */ -int XrdPssSys::Init(XrdSysLogger *lp, const char *configfn) +int XrdPssSys::Init(XrdSysLogger *lp, const char *cFN) { int NoGo; const char *tmp; @@ -157,11 +161,11 @@ int XrdPssSys::Init(XrdSysLogger *lp, const char *configfn) // SysTrace.SetLogger(lp); eDest.logger(lp); - eDest.Say("Copr. 2018, Stanford University, Pss Version " XrdVSTRING); + eDest.Say("Copr. 2019, Stanford University, Pss Version " XrdVSTRING); // Initialize the subsystems // - tmp = ((NoGo=Configure(configfn)) ? "failed." : "completed."); + tmp = ((NoGo = Configure(cFN)) ? "failed." : "completed."); eDest.Say("------ Proxy storage system initialization ", tmp); // All done. diff --git a/src/XrdPss/XrdPssConfig.cc b/src/XrdPss/XrdPssConfig.cc index 46445a4dad5..b7258cb0e38 100644 --- a/src/XrdPss/XrdPssConfig.cc +++ b/src/XrdPss/XrdPssConfig.cc @@ -68,6 +68,8 @@ #include "XrdPosix/XrdPosixXrootd.hh" #include "XrdPosix/XrdPosixXrootdPath.hh" +#include "XrdXrootd/XrdXrootdGStream.hh" + /******************************************************************************/ /* d e f i n e s */ /******************************************************************************/ @@ -118,6 +120,8 @@ extern XrdSysError eDest; extern XrdOucSid *sidP; +extern XrdOucEnv *envP; + extern XrdSysTrace SysTrace; static const int maxHLen = 1024; @@ -156,9 +160,9 @@ int XrdPssSys::Configure(const char *cfn) // XrdOucEnv::Export("XRDXROOTD_NOPOSC", "1"); -// Create a configurator +// Create a configurator. It will be deleted when we are done. // - psxConfig = new XrdOucPsx(myVersion, cfn, eDest.logger()); // Deleted later + psxConfig = new XrdOucPsx(myVersion, cfn, eDest.logger(), envP); // Set debug level if so wanted // @@ -669,7 +673,7 @@ int XrdPssSys::xorig(XrdSysError *errp, XrdOucStream &Config) // of domain. Composite listings are normally disabled for out of domain nodes. // if (!index(mval, '.') || !strcmp(getDomain(mval), getDomain(myHost))) - XrdPosixXrootd::setEnv("DirlistDflt", 1); + XrdPosixConfig::SetEnv("DirlistDflt", 1); // All done // diff --git a/src/XrdServer.cmake b/src/XrdServer.cmake index a3d254c3a70..72dc7e2abb7 100644 --- a/src/XrdServer.cmake +++ b/src/XrdServer.cmake @@ -39,6 +39,8 @@ add_library( XrdXrootd/XrdXrootdProtocol.cc XrdXrootd/XrdXrootdProtocol.hh XrdXrootd/XrdXrootdResponse.cc XrdXrootd/XrdXrootdResponse.hh XrdXrootd/XrdXrootdStats.cc XrdXrootd/XrdXrootdStats.hh + XrdXrootd/XrdXrootdGSReal.cc XrdXrootd/XrdXrootdGSReal.hh + XrdXrootd/XrdXrootdGStream.cc XrdXrootd/XrdXrootdGStream.hh XrdXrootd/XrdXrootdTransit.cc XrdXrootd/XrdXrootdTransit.hh XrdXrootd/XrdXrootdTransPend.cc XrdXrootd/XrdXrootdTransPend.hh XrdXrootd/XrdXrootdTransSend.cc XrdXrootd/XrdXrootdTransSend.hh diff --git a/src/XrdXrootd/XrdXrootdConfig.cc b/src/XrdXrootd/XrdXrootdConfig.cc index bc1d5e23c8d..882616d2ede 100644 --- a/src/XrdXrootd/XrdXrootdConfig.cc +++ b/src/XrdXrootd/XrdXrootdConfig.cc @@ -67,6 +67,7 @@ #include "XrdXrootd/XrdXrootdFile.hh" #include "XrdXrootd/XrdXrootdFileLock.hh" #include "XrdXrootd/XrdXrootdFileLock1.hh" +#include "XrdXrootd/XrdXrootdGSReal.hh" #include "XrdXrootd/XrdXrootdJob.hh" #include "XrdXrootd/XrdXrootdMonitor.hh" #include "XrdXrootd/XrdXrootdPrepare.hh" @@ -272,6 +273,16 @@ int XrdXrootdProtocol::Configure(char *parms, XrdProtocol_Config *pi) // if (pi->theEnv) myEnv.PutPtr("xrdEnv*", pi->theEnv); +// Initialize monitoring (it won't do anything if it wasn't enabled) +// + if (!XrdXrootdMonitor::Init(Sched, &eDest, pi->myName, pi->myProg, + myInst, Port)) return 0; + +// Config g-stream objects, as needed. This needs to be done before we +// load any plugins but after we initialize monitoring. +// + ConfigGStream(myEnv); + // Get the filesystem to be used // if (FSLib[0]) @@ -417,11 +428,6 @@ int XrdXrootdProtocol::Configure(char *parms, XrdProtocol_Config *pi) } while(xp); } -// Initialize monitoring (it won't do anything if it wasn't enabled) -// - if (!XrdXrootdMonitor::Init(Sched, &eDest, pi->myName, pi->myProg, - myInst, Port)) return 0; - // Add all jobs that we can run to the admin object // if (JobCKS) XrdXrootdAdmin::addJob("chksum", JobCKS); @@ -563,6 +569,35 @@ void XrdXrootdProtocol::PidFile() if (xop) eDest.Emsg("Config", errno, xop, pidFN); } +/******************************************************************************/ +/* C o n f i g G S t r e a m */ +/******************************************************************************/ + +void XrdXrootdProtocol::ConfigGStream(XrdOucEnv &myEnv) +{ + struct GSTable {const char *pin; int Mode; char Type;} gsObj[] = + {{"ccm", XROOTD_MON_CCM, XROOTD_MON_GSCCM}, + {"pfc", XROOTD_MON_PFC, XROOTD_MON_GSPFC} + }; + int numgs = sizeof(gsObj)/sizeof(struct GSTable); + int flint = XrdXrootdMonitor::Flushing(); + char vbuff[64]; + +// For each enabled monitoring provider, allocate a g-stream and put +// its address in our environment. +// + for (int i = 0; i < numgs; i++) + {if (XrdXrootdMonitor::ModeEnabled(gsObj[i].Mode)) + {XrdXrootdGStream *gs = new XrdXrootdGSReal(gsObj[i].pin, + gsObj[i].Type, + gsObj[i].Mode, flint); + snprintf(vbuff, sizeof(vbuff), "%s.gStream*", gsObj[i].pin); + myEnv.PutPtr(vbuff, (void *)gs); + } + } +} + + /******************************************************************************/ /* C o n f i g S e c u r i t y */ /******************************************************************************/ @@ -1162,7 +1197,7 @@ int XrdXrootdProtocol::xlog(XrdOucStream &Config) [rnums ] [window ] dest [Events] - Events: [files] [fstat] [info] [io] [iov] [redir] [user] + Events: [ccm] [files] [fstat] [info] [io] [iov] [pfc] [redir] [user] all enables monitoring for all connections. auth add authentication information to "user". @@ -1182,11 +1217,13 @@ int XrdXrootdProtocol::xlog(XrdOucStream &Config) window time (seconds, M, H) between timing marks. dest specified routing information. Up to two dests may be specified. + ccm monitor cache context management files only monitors file open/close events. fstats vectors the "f" stream to the destination info monitors client appid and info requests. io monitors I/O requests, and files open/close events. iov like I/O but also unwinds vector reads. + pfc monitor proxy file cache redir monitors request redirections user monitors user login and disconnect events. where monitor records are to be sentvia UDP. @@ -1284,12 +1321,14 @@ int XrdXrootdProtocol::xmon(XrdOucStream &Config) for (i = 0; i < 2; i++) {if (strcmp("dest", val)) break; while((val = Config.GetWord())) - if (!strcmp("files",val)) monMode[i] |= XROOTD_MON_FILE; + if (!strcmp("ccm", val)) monMode[i] |= XROOTD_MON_CCM; + else if (!strcmp("files",val)) monMode[i] |= XROOTD_MON_FILE; else if (!strcmp("fstat",val)) monMode[i] |= XROOTD_MON_FSTA; else if (!strcmp("info", val)) monMode[i] |= XROOTD_MON_INFO; else if (!strcmp("io", val)) monMode[i] |= XROOTD_MON_IO; else if (!strcmp("iov", val)) monMode[i] |= (XROOTD_MON_IO |XROOTD_MON_IOV); + else if (!strcmp("pfc", val)) monMode[i] |= XROOTD_MON_PFC; else if (!strcmp("redir",val)) monMode[i] |= XROOTD_MON_REDR; else if (!strcmp("user", val)) monMode[i] |= XROOTD_MON_USER; else break; diff --git a/src/XrdXrootd/XrdXrootdGSReal.cc b/src/XrdXrootd/XrdXrootdGSReal.cc new file mode 100644 index 00000000000..b9e93388ef8 --- /dev/null +++ b/src/XrdXrootd/XrdXrootdGSReal.cc @@ -0,0 +1,286 @@ +/******************************************************************************/ +/* */ +/* X r d X r o o t d G S R e a l . h h */ +/* */ +/* (c) 2019 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include +#include + +#include "Xrd/XrdScheduler.hh" +#include "XrdSys/XrdSysPlatform.hh" +#include "XrdXrootd/XrdXrootdGSReal.hh" + +/******************************************************************************/ +/* G l o b a l s */ +/******************************************************************************/ + +namespace XrdXrootdMonInfo +{ +extern XrdScheduler *Sched; +extern char *monHost; +extern long long mySID; +extern int startTime; +} + +using namespace XrdXrootdMonInfo; + +/******************************************************************************/ +/* C o n s t r u c t o r */ +/******************************************************************************/ + +XrdXrootdGSReal::XrdXrootdGSReal(const char *gNamePI, char gDataID, + int mtype, int flint) + : XrdJob("GStream"), XrdXrootdGStream(*this) +{ + char idBuff[1024]; + +// Initialze the udp buffer +// + memset(&gMsg.info, 0, sizeof(gMsg.info)); + gMsg.info.hdr.code = XROOTD_MON_MAPGSTA; + gMsg.info.hdr.stod = startTime; + + long long theSID = ntohll(mySID) & 0x00ffffffffffffff; + gMsg.info.sID = htonll(theSID | (static_cast(gDataID) << 56)); + +// Setup buffer pointers +// + udpBFirst = udpBNext = gMsg.buff; + udpBEnd = gMsg.buff + sizeof(gMsg.buff) - 1; + +// Initialize remaining variables +// + monType = mtype; + rsvbytes = 0; + afRunning = false; + SetAutoFlush(flint); + +// Construct our user name as in .0:0@ +// + snprintf(idBuff, sizeof(idBuff), "%s.0:0@%s", gNamePI, monHost); + +// Register ourselves +// + gMon.Register(idBuff, monHost, "xroot"); +} + +/******************************************************************************/ +/* A u t o F l u s h */ +/******************************************************************************/ + +void XrdXrootdGSReal::AutoFlush() // gMutex is locked outside constructor +{ + if (afTime && !afRunning) + {Sched->Schedule((XrdJob *)this, time(0)+afTime); + afRunning = true; + } +} + +/******************************************************************************/ +/* D o I t */ +/******************************************************************************/ + +void XrdXrootdGSReal::DoIt() +{ + XrdSysMutexHelper gHelp(gMutex); + +// Check if we need to do anything here +// + afRunning = false; + if (afTime) + {if (gMsg.info.tBeg && time(0)-gMsg.info.tBeg >= afTime) Expel(0); + AutoFlush(); + } +} + +/******************************************************************************/ +/* Private: E x p e l */ +/******************************************************************************/ + +void XrdXrootdGSReal::Expel(int dlen) // gMutex is held +{ + int size; + +// Check if we need to flush this buffer. +// + if (udpBFirst == udpBNext || (dlen && (udpBNext + dlen) < udpBEnd)) return; + +// Complete the buffer +// + size = udpBNext-(char *)&gMsg; + gMsg.info.hdr.pseq++; + gMsg.info.hdr.plen = htons(static_cast(size)); + *(udpBNext-1) = 0; + +// Send off the packet +// + XrdXrootdMonitor::Send(monType, &gMsg, size); + +// Reset the buffer +// + udpBNext = udpBFirst; + gMsg.info.tBeg = gMsg.info.tEnd = 0; +} + +/******************************************************************************/ +/* F l u s h */ +/******************************************************************************/ + +void XrdXrootdGSReal::Flush() +{ + XrdSysMutexHelper gHelp(gMutex); + Expel(0); +} + +/******************************************************************************/ +/* G e t D i c t I D */ +/******************************************************************************/ + +uint32_t XrdXrootdGSReal::GetDictID(const char *text, bool isPath) +{ +// Record the mapping and return it +// + return (isPath ? gMon.MapPath(text) : gMon.MapInfo(text)); +} + +/******************************************************************************/ +/* I n s e r t */ +/******************************************************************************/ + +bool XrdXrootdGSReal::Insert(const char *data, int dlen) +{ + +// Validate the length and message +// + if (dlen < 8 || dlen > XrdXrootdGStream::MaxDataLen + || !data || data[dlen-1]) return false; + +// Reserve the storage and copy the message. It always will end with a newline +// + gMutex.Lock(); + Expel(dlen); + memcpy(udpBNext, data, dlen-1); + udpBNext[dlen-1] = '\n'; + +// Timestamp the record and aAdjust buffer pointers +// + gMsg.info.tEnd = time(0); + if (udpBNext == udpBFirst) gMsg.info.tBeg = gMsg.info.tEnd; + udpBNext += dlen; + +// All done +// + gMutex.UnLock(); + return true; +} + +/******************************************************************************/ + +bool XrdXrootdGSReal::Insert(int dlen) +{ + XrdSysMutexHelper gHelp(gMutex); + +// Make sure space is reserved +// + if (!rsvbytes) return false; + +// We are now sure that the recursive lock is held twice by this thread. So, +// make it a unitary lock so it gets fully unlocked upon rturn. +// + gMutex.UnLock(); + +// Check for cancellation +// + if (!dlen) + {rsvbytes = 0; + return true; + } + +// Length, it must >= 8 and <= reserved amount and the data must end with a 0. +// + if (dlen > rsvbytes || dlen < 8 || *(udpBNext+dlen-1)) + {rsvbytes = 0; + return false; + } + +// Adjust the buffer space and time stamp the record +// + gMsg.info.tEnd = time(0); + if (udpBNext == udpBFirst) gMsg.info.tBeg = gMsg.info.tEnd; + udpBNext += dlen; + *(udpBNext-1) = '\n'; + rsvbytes = 0; + +// All done + + return true; +} + +/******************************************************************************/ +/* R e s e r v e */ +/******************************************************************************/ + +char *XrdXrootdGSReal::Reserve(int dlen) +{ +// Validate the length +// + if (dlen < 8 || dlen > XrdXrootdGStream::MaxDataLen) return 0; + +// Make sure there is no reserve outstanding +// + gMutex.Lock(); + if (rsvbytes) + {gMutex.UnLock(); + return 0; + } + +// Return the allocated the space but keep the lock until Insert() is called. +// + rsvbytes = dlen; + Expel(dlen); + return udpBNext; +} + +/******************************************************************************/ +/* S e t A u t o F l u s h */ +/******************************************************************************/ + +int XrdXrootdGSReal::SetAutoFlush(int afsec) +{ + XrdSysMutexHelper gHelp(gMutex); + +// Save the current settting and establish the new one and relaunch +// + int afNow = afTime; + afTime = (afsec > 0 ? afsec : 0); + AutoFlush(); + +// All done +// + return afNow; +} diff --git a/src/XrdXrootd/XrdXrootdGSReal.hh b/src/XrdXrootd/XrdXrootdGSReal.hh new file mode 100644 index 00000000000..4729c54eadb --- /dev/null +++ b/src/XrdXrootd/XrdXrootdGSReal.hh @@ -0,0 +1,103 @@ +#ifndef __XRDXROOTDGSREAL_HH_ +#define __XRDXROOTDGSREAL_HH_ +/******************************************************************************/ +/* */ +/* X r d X r o o t d G S R e a l . h h */ +/* */ +/* (c) 2019 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include "Xrd/XrdJob.hh" +#include "XrdSys/XrdSysPthread.hh" +#include "XrdXrootd/XrdXrootdGStream.hh" +#include "XrdXrootd/XrdXrootdMonData.hh" +#include "XrdXrootd/XrdXrootdMonitor.hh" + +//----------------------------------------------------------------------------- +//! This class implements a generic reporter for the XRootD monitoring stream, +//! also known as the G-Stream. It's base class is passed around to various +//! plugins to allow them to add monitoring information into the G-Stream. +//----------------------------------------------------------------------------- + +class XrdXrootdGSReal : public XrdJob, public XrdXrootdGStream +{ +public: + +void DoIt(); // XrdJob override + +void Flush(); + +uint32_t GetDictID(const char *text, bool isPath=false); + +bool Insert(const char *data, int dlen); + +bool Insert(int dlen); + +char *Reserve(int dlen); + +int SetAutoFlush(int afsec); + +//----------------------------------------------------------------------------- +//! Constructor +//! +//! @param gNamePI the plugin name. +//! @param gDataID the G-Stream identifier associated with all of the data +//! that will be placed in the stream using this object. +//! See XrdXrootdMonData.hh for valid subtypes. +//! @param mtype the monitor type for send routing. +//! @param flint the autoflush interval. +//----------------------------------------------------------------------------- + + XrdXrootdGSReal(const char *gNamePI, char gDataID, + int mtype, int flint); + +//----------------------------------------------------------------------------- +//! Destructor. Normally, this object is never deleted. +//----------------------------------------------------------------------------- + + ~XrdXrootdGSReal() {} + +private: + +void AutoFlush(); +void Expel(int dlen); + +XrdSysRecMutex gMutex; +char *udpBFirst; +char *udpBNext; +char *udpBEnd; +int rsvbytes; +int monType; +int afTime; +bool afRunning; + +XrdXrootdMonitor::User gMon; + +struct GStream {XrdXrootdMonGS info; + char buff[64536-sizeof(info)]; + } gMsg; +}; +#endif diff --git a/src/XrdXrootd/XrdXrootdGStream.cc b/src/XrdXrootd/XrdXrootdGStream.cc new file mode 100644 index 00000000000..fc4c8585b95 --- /dev/null +++ b/src/XrdXrootd/XrdXrootdGStream.cc @@ -0,0 +1,70 @@ +/******************************************************************************/ +/* */ +/* X r d X r o o t d G S t r e a m . c c */ +/* */ +/* (c) 2019 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include "XrdXrootd/XrdXrootdGStream.hh" +#include "XrdXrootd/XrdXrootdGSReal.hh" + +/******************************************************************************/ +/* F l u s h */ +/******************************************************************************/ + +void XrdXrootdGStream::Flush() {gStream.Flush();} + +/******************************************************************************/ +/* G e t D i c t I D */ +/******************************************************************************/ + +uint32_t XrdXrootdGStream::GetDictID(const char *text, bool isPath) + {return gStream.GetDictID(text, isPath);} + +/******************************************************************************/ +/* I n s e r t */ +/******************************************************************************/ + +bool XrdXrootdGStream::Insert(const char *data, int dlen) + {return gStream.Insert(data, dlen);} + +bool XrdXrootdGStream::Insert(int dlen) {return gStream.Insert(dlen);} + +/******************************************************************************/ +/* R e s e r v e */ +/******************************************************************************/ + +char *XrdXrootdGStream::Reserve(int dlen) {return gStream.Reserve(dlen);} + +/******************************************************************************/ +/* S e t A u t o F l u s h */ +/******************************************************************************/ + +int XrdXrootdGStream::SetAutoFlush(int afsec) + {if (afsec < 0) afsec = 0; + else if (afsec < 60) afsec = 60; + return gStream.SetAutoFlush(afsec); + } diff --git a/src/XrdXrootd/XrdXrootdGStream.hh b/src/XrdXrootd/XrdXrootdGStream.hh new file mode 100644 index 00000000000..482c02bd7b2 --- /dev/null +++ b/src/XrdXrootd/XrdXrootdGStream.hh @@ -0,0 +1,154 @@ +#ifndef __XRDXROOTDGSTREAM_HH_ +#define __XRDXROOTDGSTREAM_HH_ +/******************************************************************************/ +/* */ +/* X r d X r o o t G S t r e a m . h h */ +/* */ +/* (c) 2019 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include + +//----------------------------------------------------------------------------- +//! This class implements a generic reporter for the XRootD monitoring stream, +//! also known as the G-Stream. It is passed to various plugins when monitoring +//! is enabled to allow plugins to add monitoring information into the G-Stream. +//----------------------------------------------------------------------------- + +class XrdXrootdGSReal; + +class XrdXrootdGStream +{ +public: + +//----------------------------------------------------------------------------- +//! Flush any pending monitoring messages to the data collector. Also, see +//! the related SetAutoFlush() method. +//----------------------------------------------------------------------------- + +void Flush(); + +//----------------------------------------------------------------------------- +//! Obtain a dictionary ID to map a string to an integer ID. The mapping is +//! automatically sent to the monitor collector for future use using the +//! 'd' or 'i' mapping identifier. +//! +//! @param text -> the null terminated string to be assigned an ID. The +//! text must be less than or equal to 1024 characters. +//! @param isPath when true the text specified a file system path and +//! identified as a XROOTD_MON_MAPPATH item. Otherwise, +//! it is identified as a XROOTD_MON_MAPINFO item. +//! +//! @return the integer identifier assigned to the string information. The +//! returned value is in network byte order! +//----------------------------------------------------------------------------- + +uint32_t GetDictID(const char *text, bool isPath=false); + +//----------------------------------------------------------------------------- +//! Insert information into the G-Stream. +//! +//! @param data -> to null-terminated text to be included in the G-Stream. +//! @param dlen the length of the text *including* the null character. +//! Requires that (8 <= dlen <= MaxDataLen); defined below. +//! +//! @return true data included. +//! @return false data rejected; invalid length or is not null terminated. +//----------------------------------------------------------------------------- + +bool Insert(const char *data, int dlen); + +//----------------------------------------------------------------------------- +//! Insert information into the G-Stream using the data placed in the buffer +//! space obtained by a previous call to Reserve(). Upon return, this object +//! is unlocked. +//! +//! @param dlen the number of bytes actually present in the buffer. The +//! text must end with a null byte and dlen must be +//! 8 <= dlen <= dlen used in the previous Reserve() call. +//! +//! @return true data included. +//! @return false data rejected; invalid length or no buffer outstanding. +//----------------------------------------------------------------------------- + +bool Insert(int dlen); + +//----------------------------------------------------------------------------- +//! Obtain a buffer space for information. This object is locked and no other +//! thread can insert information until the buffer is inserted using Insert(). +//! +//! @param dlen the number of bytes required to be available for use. +//! Requires that (8 <= dlen <= MaxDataLen); defined below. +//! +//! @return !0 pointer to a dlen sized buffer. +//! @return =0 invalid length specified or a buffer is outstanding. +//----------------------------------------------------------------------------- + +char *Reserve(int dlen); + +//----------------------------------------------------------------------------- +//! Set autoflush time interval (or disable it). Disabling autoflush may be +//! useful when data is periodically generated at a low rate and manual +//! flushing would produce more consistent results. +//! +//! @param afsec Number of seconds between autoflushing. A zero or +//! negative value disables autoflush. The default is +//! 600 seconds (i.e. 10 minutes) subject to what is +//! specified via the xrootd.monitor flush directive. +//! Positive values less that 60 are considered to be 60. +//! +//! @return The previous auto-flush setting. +//----------------------------------------------------------------------------- + +int SetAutoFlush(int afsec); + +//----------------------------------------------------------------------------- +//! The larest amount of data that can be inserted in a single call to GStream. +//----------------------------------------------------------------------------- +static +const int MaxDataLen = 65280; + +//----------------------------------------------------------------------------- +//! Constructor +//! +//! @param gsRef refrence to the G-Stream implementation. +//----------------------------------------------------------------------------- + + XrdXrootdGStream(XrdXrootdGSReal &gsRef) : gStream(gsRef) {} + +protected: + +//----------------------------------------------------------------------------- +//! Destructor. This stream should never be directly deleted. +//----------------------------------------------------------------------------- + + ~XrdXrootdGStream() {} + +private: + +XrdXrootdGSReal &gStream; +}; +#endif diff --git a/src/XrdXrootd/XrdXrootdMonData.hh b/src/XrdXrootd/XrdXrootdMonData.hh index 9f5cc7de4ac..ef63bdf1b0d 100644 --- a/src/XrdXrootd/XrdXrootdMonData.hh +++ b/src/XrdXrootd/XrdXrootdMonData.hh @@ -77,6 +77,13 @@ struct XrdXrootdMonBurr XrdXrootdMonRedir info[sizeof(XrdXrootdMonRedir)]; //This is really [n] }; +struct XrdXrootdMonGS + {XrdXrootdMonHeader hdr; + int tBeg; // time(0) of the first record + int tEnd; // time(0) of the last record + kXR_int64 sID; // Server id in lower 48 bits +}; // Information provider top 8 bits. + struct XrdXrootdMonMap {XrdXrootdMonHeader hdr; kXR_unt32 dictid; @@ -96,6 +103,7 @@ const kXR_char XROOTD_MON_WINDOW = 0xe0; const kXR_char XROOTD_MON_MAPIDNT = '='; const kXR_char XROOTD_MON_MAPPATH = 'd'; const kXR_char XROOTD_MON_MAPFSTA = 'f'; // The "f" stream +const kXR_char XROOTD_MON_MAPGSTA = 'g'; // The "g" stream const kXR_char XROOTD_MON_MAPINFO = 'i'; const kXR_char XROOTD_MON_MAPMIGR = 'm'; // Internal use only! const kXR_char XROOTD_MON_MAPPURG = 'p'; @@ -105,6 +113,9 @@ const kXR_char XROOTD_MON_MAPTRCE = 't'; const kXR_char XROOTD_MON_MAPUSER = 'u'; const kXR_char XROOTD_MON_MAPXFER = 'x'; +const kXR_char XROOTD_MON_GSCCM = 'M'; // pfc: Cache context mgt info +const kXR_char XROOTD_MON_GSPFC = 'C'; // pfc: Cache monitoring info + // The following bits are insert in the low order 4 bits of the MON_REDIRECT // entry code to indicate the actual operation that was requestded. // @@ -137,6 +148,10 @@ const int XROOTD_MON_SRCMASK = 0x000000f; const int XROOTD_MON_TRGMASK = 0x7fffff0; const int XROOTD_MON_NEWSTID = 0x8000000; +const long long XROOTD_MON_SIDMASK = 0x0000ffffffffffff; +const long long XROOTD_MON_PIDMASK = 0xff; +const long long XROOTD_MON_PIDSHFT = 56; + /******************************************************************************/ /* " f " S t r e a m S p e c i f i c R e c o r d s */ /******************************************************************************/ diff --git a/src/XrdXrootd/XrdXrootdMonFile.cc b/src/XrdXrootd/XrdXrootdMonFile.cc index 16210e8fcf5..dbe63106bf3 100644 --- a/src/XrdXrootd/XrdXrootdMonFile.cc +++ b/src/XrdXrootd/XrdXrootdMonFile.cc @@ -44,15 +44,16 @@ namespace XrdXrootdMonInfo { -extern long long mySID; +extern XrdScheduler *Sched; +extern XrdSysError *eDest; +extern long long mySID; +extern int32_t startTime; } /******************************************************************************/ /* S t a t i c M e m b e r s */ /******************************************************************************/ -XrdSysError *XrdXrootdMonFile::eDest = 0; -XrdScheduler *XrdXrootdMonFile::Sched = 0; XrdSysMutex XrdXrootdMonFile::bfMutex; XrdSysMutex XrdXrootdMonFile::fmMutex; XrdXrootdMonFMap XrdXrootdMonFile::fmMap[XrdXrootdMonFMap::mapNum]; @@ -239,7 +240,7 @@ void XrdXrootdMonFile::DoIt() // Reschedule ourselves // - XrdXrootdMonitor::Sched->Schedule((XrdJob *)this, time(0)+repTime); + XrdXrootdMonInfo::Sched->Schedule((XrdJob *)this, time(0)+repTime); } /******************************************************************************/ @@ -318,21 +319,16 @@ void XrdXrootdMonFile::DoXFR(XrdXrootdFileStats *fsP) /* I n i t */ /******************************************************************************/ -bool XrdXrootdMonFile::Init(XrdScheduler *sp, XrdSysError *errp, int bfsz) +bool XrdXrootdMonFile::Init(int bfsz) { XrdXrootdMonFile *mfP; int alignment, pagsz = getpagesize(); -// Set the variables -// - Sched = sp; - eDest = errp; - // Allocate a socket buffer // alignment = (bfsz < pagsz ? 1024 : pagsz); if (posix_memalign((void **)&repBuff, alignment, bfsz)) - {eDest->Emsg("MonFile", "Unable to allocate monitor buffer."); + {XrdXrootdMonInfo::eDest->Emsg("MonFile", "Unable to allocate monitor buffer."); return false; } @@ -341,7 +337,7 @@ bool XrdXrootdMonFile::Init(XrdScheduler *sp, XrdSysError *errp, int bfsz) repHdr = (XrdXrootdMonHeader *)repBuff; repHdr->code = XROOTD_MON_MAPFSTA; repHdr->pseq = 0; - repHdr->stod = XrdXrootdMonitor::startTime; + repHdr->stod = XrdXrootdMonInfo::startTime; // Set the time record (always present) // @@ -389,7 +385,7 @@ bool XrdXrootdMonFile::Init(XrdScheduler *sp, XrdSysError *errp, int bfsz) // Schedule an the flushes // - XrdXrootdMonitor::Sched->Schedule((XrdJob *)mfP, time(0)+repTime); + XrdXrootdMonInfo::Sched->Schedule((XrdJob *)mfP, time(0)+repTime); return true; } diff --git a/src/XrdXrootd/XrdXrootdMonFile.hh b/src/XrdXrootd/XrdXrootdMonFile.hh index 16fad210b3f..62ab8a5a8bf 100644 --- a/src/XrdXrootd/XrdXrootdMonFile.hh +++ b/src/XrdXrootd/XrdXrootdMonFile.hh @@ -35,8 +35,6 @@ #include "XrdXrootd/XrdXrootdMonFMap.hh" #include "XrdXrootd/XrdXrootdMonitor.hh" -class XrdScheduler; -class XrdSysError; class XrdXrootdFileStats; class XrdXrootdMonHeader; class XrdXrootdMonTrace; @@ -53,7 +51,7 @@ static void Disc(unsigned int usrID); void DoIt(); -static bool Init(XrdScheduler *sp, XrdSysError *errp, int bfsz=65472); +static bool Init(int bfsz=65472); static void Open(XrdXrootdFileStats *fsP, const char *Path, unsigned int uDID, bool isRW); @@ -68,8 +66,6 @@ static void DoXFR(XrdXrootdFileStats *fsP); static void Flush(); static char *GetSlot(int slotSZ); -static XrdSysError *eDest; -static XrdScheduler *Sched; static XrdSysMutex bfMutex; static XrdSysMutex fmMutex; static XrdXrootdMonFMap fmMap[XrdXrootdMonFMap::mapNum]; diff --git a/src/XrdXrootd/XrdXrootdMonitor.cc b/src/XrdXrootd/XrdXrootdMonitor.cc index f1fac80b5bd..79ee98735e2 100644 --- a/src/XrdXrootd/XrdXrootdMonitor.cc +++ b/src/XrdXrootd/XrdXrootdMonitor.cc @@ -55,8 +55,6 @@ /* S t a t i c A l l o c a t i o n */ /******************************************************************************/ -XrdScheduler *XrdXrootdMonitor::Sched = 0; -XrdSysError *XrdXrootdMonitor::eDest = 0; char *XrdXrootdMonitor::idRec = 0; int XrdXrootdMonitor::idLen = 0; char *XrdXrootdMonitor::Dest1 = 0; @@ -67,7 +65,6 @@ int XrdXrootdMonitor::monMode2 = 0; XrdNetMsg *XrdXrootdMonitor::InetDest2 = 0; XrdXrootdMonitor *XrdXrootdMonitor::altMon = 0; XrdSysMutex XrdXrootdMonitor::windowMutex; -kXR_int32 XrdXrootdMonitor::startTime = 0; int XrdXrootdMonitor::monRlen = 0; XrdXrootdMonitor::MonRdrBuff XrdXrootdMonitor::rdrMon[XrdXrootdMonitor::rdrMax]; @@ -108,7 +105,13 @@ extern XrdOucTrace *XrdXrootdTrace; namespace XrdXrootdMonInfo { -long long mySID = 0; +XrdScheduler *Sched = 0; +XrdSysError *eDest = 0; +char *monHost = 0; +long long mySID = 0; +int32_t startTime = htonl(time(0)); +int seq = 0; +XrdSysMutex seqMutex; } using namespace XrdXrootdMonInfo; @@ -558,7 +561,6 @@ int XrdXrootdMonitor::Init(XrdScheduler *sp, XrdSysError *errp, // Sched = sp; eDest = errp; - startTime = htonl(Now); // Generate our server ID // @@ -571,6 +573,7 @@ int XrdXrootdMonitor::Init(XrdScheduler *sp, XrdSysError *errp, if (sidSize >= (int)sizeof(sidName)) sName[sizeof(sidName)-1] = 0; strcpy(sidName, sName); free(sName); + monHost = strdup(iHost); // There is nothing to do unless we have been enabled via Defaults() // @@ -626,7 +629,7 @@ int XrdXrootdMonitor::Init(XrdScheduler *sp, XrdSysError *errp, // If we are monitoring file stats then start that up // if (!Sched || !monFSTAT) monFSTAT = 0; - else if (!XrdXrootdMonFile::Init(Sched, eDest)) return 0; + else if (!XrdXrootdMonFile::Init()) return 0; // If we are not monitoring redirections, we are done! // @@ -899,9 +902,7 @@ unsigned char XrdXrootdMonitor::do_Shift(long long xTot, unsigned int &xVal) void XrdXrootdMonitor::fillHeader(XrdXrootdMonHeader *hdr, const char id, int size) -{ static XrdSysMutex seqMutex; - static int seq = 0; - int myseq; +{ int myseq; // Generate a new sequence number // diff --git a/src/XrdXrootd/XrdXrootdMonitor.hh b/src/XrdXrootd/XrdXrootdMonitor.hh index 0c9ab44dc47..c2d1717c0c8 100644 --- a/src/XrdXrootd/XrdXrootdMonitor.hh +++ b/src/XrdXrootd/XrdXrootdMonitor.hh @@ -54,6 +54,8 @@ #define XROOTD_MON_REDR 64 #define XROOTD_MON_IOV 128 #define XROOTD_MON_FSTA 256 +#define XROOTD_MON_CCM 512 +#define XROOTD_MON_PFC 1024 #define XROOTD_MON_FSLFN 1 #define XROOTD_MON_FSOPS 2 @@ -113,12 +115,17 @@ static void Defaults(int msz, int rsz, int wsz, int flush, int flash, int iDent, int rnm, int fsint=0, int fsopt=0, int fsion=0); +static int Flushing() {return autoFlush;} + static void Ident() {Send(-1, idRec, idLen);} static int Init(XrdScheduler *sp, XrdSysError *errp, const char *iHost, const char *iProg, const char *iName, int Port); +static bool ModeEnabled(int mode) + {return ((monMode1|monMode2) & mode) != 0;} + void Open(kXR_unt32 dictid, off_t fsize); static int Redirect() {return monREDR;} @@ -126,6 +133,8 @@ static int Redirect() {return monREDR;} static int Redirect(kXR_unt32 mID, const char *hName, int Port, const char opC, const char *Path); +static int Send(int mmode, void *buff, int size); + static time_t Tick(); class User @@ -222,12 +231,9 @@ static kXR_unt32 GetDictID(); static kXR_unt32 Map(char code, XrdXrootdMonitor::User &uInfo, const char *path); void Mark(); -static int Send(int mmode, void *buff, int size); static void startClock(); static void unAlloc(XrdXrootdMonitor *monp); -static XrdScheduler *Sched; -static XrdSysError *eDest; static XrdSysMutex windowMutex; static char *idRec; static int idLen; @@ -245,7 +251,6 @@ static int lastRnt; static int autoFlash; static int autoFlush; static int FlushTime; -static kXR_int32 startTime; kXR_int32 lastWindow; static kXR_int32 currWindow; static int rdrTOD; diff --git a/src/XrdXrootd/XrdXrootdProtocol.hh b/src/XrdXrootd/XrdXrootdProtocol.hh index 8fdc898e483..19aaf47d143 100644 --- a/src/XrdXrootd/XrdXrootdProtocol.hh +++ b/src/XrdXrootd/XrdXrootdProtocol.hh @@ -208,6 +208,7 @@ static void PidFile(); static int rpCheck(char *fn, char **opaque); int rpEmsg(const char *op, char *fn); int vpEmsg(const char *op, char *fn); +static void ConfigGStream(XrdOucEnv &myEnv); static int Squash(char *); int StatGen(struct stat &buf, char *xxBuff, int xxLen, bool xa=false); static int xapath(XrdOucStream &Config);