diff --git a/src/XrdFileCache.cmake b/src/XrdFileCache.cmake index f241adc7fec..2dc0d84bea9 100644 --- a/src/XrdFileCache.cmake +++ b/src/XrdFileCache.cmake @@ -17,7 +17,8 @@ add_library( ${LIB_XRD_FILECACHE} MODULE XrdFileCache/XrdFileCache.cc XrdFileCache/XrdFileCache.hh - XrdFileCache/XrdFileCacheFactory.cc XrdFileCache/XrdFileCacheFactory.hh + XrdFileCache/XrdFileCacheConfiguration.cc + XrdFileCache/XrdFileCachePurge.cc XrdFileCache/XrdFileCacheFile.cc XrdFileCache/XrdFileCacheFile.hh XrdFileCache/XrdFileCacheVRead.cc XrdFileCache/XrdFileCacheStats.hh diff --git a/src/XrdFileCache/XrdFileCache.cc b/src/XrdFileCache/XrdFileCache.cc index a103b032176..7572103dfb3 100644 --- a/src/XrdFileCache/XrdFileCache.cc +++ b/src/XrdFileCache/XrdFileCache.cc @@ -27,15 +27,27 @@ #include "XrdSys/XrdSysTimer.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucEnv.hh" +#include "XrdOuc/XrdOucUtils.hh" #include "XrdFileCache.hh" #include "XrdFileCacheIOEntireFile.hh" #include "XrdFileCacheIOFileBlock.hh" -#include "XrdFileCacheFactory.hh" +//#include "XrdFileCacheConfiguration.cc" +//#include "XrdFileCachePurge.cc" using namespace XrdFileCache; + + +Cache * Cache::m_factory = NULL; + +void *CacheDirCleanupThread(void* cache_void) +{ + Cache::GetInstance().CacheDirCleanup(); + return NULL; +} + void *ProcessWriteTaskThread(void* c) { Cache *cache = static_cast(c); @@ -49,12 +61,73 @@ void *PrefetchThread(void* ptr) cache->Prefetch(); return NULL; } + + +extern "C" +{ +XrdOucCache *XrdOucGetCache(XrdSysLogger *logger, + const char *config_filename, + const char *parameters) +{ + XrdSysError err(0, ""); + err.logger(logger); + err.Emsg("Retrieve", "Retrieving a caching proxy factory."); + Cache &factory = Cache::GetInstance(); + if (!factory.Config(logger, config_filename, parameters)) + { + err.Emsg("Retrieve", "Error - unable to create a factory."); + return NULL; + } + err.Emsg("Retrieve", "Success - returning a factory."); + + + pthread_t tid; + XrdSysThread::Run(&tid, CacheDirCleanupThread, NULL, 0, "XrdFileCache CacheDirCleanup"); + return &factory; +} +} + +Cache &Cache::GetInstance() +{ + if (m_factory == NULL) + m_factory = new Cache(); + return *m_factory; +} + +// !AMT will be obsolete in future +XrdOucCache *Cache::Create(Parms & parms, XrdOucCacheIO::aprParms * prParms) +{ + return this; +} + + +//______________________________________________________________________________ + +bool Cache::Decide(XrdOucCacheIO* io) +{ + if (!m_decisionpoints.empty()) + { + std::string filename = io->Path(); + std::vector::const_iterator it; + for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it) + { + XrdFileCache::Decision *d = *it; + if (!d) continue; + if (!d->Decide(filename, *m_output_fs)) + { + return false; + } + } + } + + return true; +} //______________________________________________________________________________ -Cache::Cache(XrdOucCacheStats & stats) : XrdOucCache(), +Cache::Cache() : XrdOucCache(), + m_log(0, "XrdFileCache_"), m_prefetch_condVar(0), - m_stats(stats), m_RAMblocks_used(0) { pthread_t tid1; @@ -68,11 +141,11 @@ Cache::Cache(XrdOucCacheStats & stats) : XrdOucCache(), XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) { - if (Factory::GetInstance().Decide(io)) + if (Cache::GetInstance().Decide(io)) { clLog()->Info(XrdCl::AppMsg, "Cache::Attach() %s", io->Path()); IO* cio; - if (Factory::GetInstance().RefConfiguration().m_hdfsmode) + if (Cache::GetInstance().RefConfiguration().m_hdfsmode) cio = new IOFileBlock(*io, m_stats, *this); else cio = new IOEntireFile(*io, m_stats, *this); @@ -179,7 +252,7 @@ bool Cache::RequestRAMBlock() { XrdSysMutexHelper lock(&m_RAMblock_mutex); - if ( m_RAMblocks_used < Factory::GetInstance().RefConfiguration().m_NRamBuffers ) + if ( m_RAMblocks_used < Cache::GetInstance().RefConfiguration().m_NRamBuffers ) { m_RAMblocks_used++; return true; @@ -215,7 +288,7 @@ Cache::RegisterPrefetchFile(File* file) { // called from File::Open() - if (Factory::GetInstance().RefConfiguration().m_prefetch) + if (Cache::GetInstance().RefConfiguration().m_prefetch) { XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Register new file BEGIN"); @@ -269,7 +342,7 @@ Cache::GetNextFileToPrefetch() void Cache::Prefetch() { - const static int limitRAM= Factory::GetInstance().RefConfiguration().m_NRamBuffers * 0.7; + const static int limitRAM= Cache::GetInstance().RefConfiguration().m_NRamBuffers * 0.7; XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch thread start"); diff --git a/src/XrdFileCache/XrdFileCache.hh b/src/XrdFileCache/XrdFileCache.hh index af616ec0c75..34086a35e1f 100644 --- a/src/XrdFileCache/XrdFileCache.hh +++ b/src/XrdFileCache/XrdFileCache.hh @@ -20,10 +20,15 @@ #include #include +#include "XrdVersion.hh" #include "XrdSys/XrdSysPthread.hh" #include "XrdOuc/XrdOucCache.hh" #include "XrdCl/XrdClDefaultEnv.hh" #include "XrdFileCacheFile.hh" +#include "XrdFileCacheDecision.hh" + +class XrdOucStream; +class XrdSysError; namespace XrdCl { class Log; @@ -35,6 +40,38 @@ class IO; namespace XrdFileCache { + //---------------------------------------------------------------------------- + //! Contains parameters configurable from the xrootd config file. + //---------------------------------------------------------------------------- + struct Configuration + { + Configuration() : + m_hdfsmode(false), + m_diskUsageLWM(-1), + m_diskUsageHWM(-1), + m_bufferSize(1024*1024), + m_NRamBuffers(8000), + m_prefetch(false), + m_prefetch_max_blocks(10), + m_hdfsbsize(128*1024*1024) {} + + bool m_hdfsmode; //!< flag for enabling block-level operation + std::string m_cache_dir; //!< path of disk cache + std::string m_username; //!< username passed to oss plugin + + long long m_diskUsageLWM; //!< cache purge low water mark + long long m_diskUsageHWM; //!< cache purge high water mark + + long long m_bufferSize; //!< prefetch buffer size, default 1MB + int m_NRamBuffers; //!< number of total in-memory cache blocks + bool m_prefetch; //!< prefetch enable state + size_t m_prefetch_max_blocks;//!< maximum number of blocks to prefetch per file + + long long m_hdfsbsize; //!< used with m_hdfsmode, default 128MB + }; + + + //---------------------------------------------------------------------------- //! Attaches/creates and detaches/deletes cache-io objects for disk based cache. //---------------------------------------------------------------------------- @@ -44,7 +81,7 @@ namespace XrdFileCache //--------------------------------------------------------------------- //! Constructor //--------------------------------------------------------------------- - Cache(XrdOucCacheStats&); + Cache(); //--------------------------------------------------------------------- //! Obtain a new IO object that fronts existing XrdOucCacheIO. @@ -57,11 +94,49 @@ namespace XrdFileCache virtual int isAttached(); //--------------------------------------------------------------------- - //! \brief Unused abstract method. Plugin instantiation role is given - //! to the Factory class. + // this is an obsolete method + virtual XrdOucCache* Create(XrdOucCache::Parms&, XrdOucCacheIO::aprParms*); + + //-------------------------------------------------------------------- + //! \brief Makes decision if the original XrdOucCacheIO should be cached. + //! + //! @param & URL of file + //! + //! @return decision if IO object will be cached. + //-------------------------------------------------------------------- + bool Decide(XrdOucCacheIO*); + + //------------------------------------------------------------------------ + //! Reference XrdFileCache configuration + //------------------------------------------------------------------------ + const Configuration& RefConfiguration() const { return m_configuration; } + + + //--------------------------------------------------------------------- + //! \brief Parse configuration file + //! + //! @param logger xrootd logger + //! @param config_filename path to configuration file + //! @param parameters optional parameters to be passed + //! + //! @return parse status + //--------------------------------------------------------------------- + bool Config(XrdSysLogger *logger, const char *config_filename, const char *parameters); + + //--------------------------------------------------------------------- + //! Singleton access. + //--------------------------------------------------------------------- + static Cache &GetInstance(); + + //--------------------------------------------------------------------- + //! Version check. + //--------------------------------------------------------------------- + static bool VCheck(XrdVersionInfo &urVersion) { return true; } + + //--------------------------------------------------------------------- + //! Thread function running disk cache purge periodically. //--------------------------------------------------------------------- - virtual XrdOucCache* Create(XrdOucCache::Parms&, XrdOucCacheIO::aprParms*) - { return NULL; } + void CacheDirCleanup(); //--------------------------------------------------------------------- //! Add downloaded block in write queue. @@ -98,13 +173,31 @@ namespace XrdFileCache //! Decrease attached count. Called from IO::Detach(). void Detach(XrdOucCacheIO *); + XrdOss* GetOss() const { return m_output_fs; } + + XrdSysError& GetSysError() { return m_log; } + + private: + bool ConfigParameters(std::string, XrdOucStream&); + bool ConfigXeq(char *, XrdOucStream &); + bool xdlib(XrdOucStream &); + static Cache *m_factory; //!< this object + + XrdSysError m_log; //!< XrdFileCache namespace logger + XrdOucCacheStats m_stats; //!< + XrdOss *m_output_fs; //!< disk cache file system + + std::vector m_decisionpoints; //!< decision plugins + + std::map m_filesInQueue; + + Configuration m_configuration; //!< configurable parameters //! Short log alias. XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); } XrdSysCondVar m_prefetch_condVar; //!< central lock for this class - XrdOucCacheStats &m_stats; //!< global cache usage statistics XrdSysMutex m_RAMblock_mutex; //!< central lock for this class int m_RAMblocks_used; diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index 41fc3170133..9726888a585 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -35,7 +35,6 @@ #include "XrdSfs/XrdSfsInterface.hh" #include "XrdPosix/XrdPosixFile.hh" #include "XrdPosix/XrdPosix.hh" -#include "XrdFileCacheFactory.hh" #include "XrdFileCache.hh" #include "Xrd/XrdScheduler.hh" @@ -68,14 +67,14 @@ namespace namespace { - Cache* cache() { return Factory::GetInstance().GetCache(); } + Cache* cache() { return &Cache::GetInstance(); } } File::File(XrdOucCacheIO &inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) : m_input(inputIO), m_output(NULL), m_infoFile(NULL), -m_cfi(Factory::GetInstance().RefConfiguration().m_bufferSize), +m_cfi(Cache::GetInstance().RefConfiguration().m_bufferSize), m_temp_filename(disk_file_path), m_offset(iOffset), m_fileSize(iFileSize), @@ -223,11 +222,11 @@ bool File::Open() { clLog()->Dump(XrdCl::AppMsg, "File::Open() open file for disk cache %s", m_input.Path()); - XrdOss &m_output_fs = *Factory::GetInstance().GetOss(); + XrdOss &m_output_fs = *Cache::GetInstance().GetOss(); // Create the data file itself. XrdOucEnv myEnv; - m_output_fs.Create(Factory::GetInstance().RefConfiguration().m_username.c_str(), m_temp_filename.c_str(), 0600, myEnv, XRDOSS_mkpath); - m_output = m_output_fs.newFile(Factory::GetInstance().RefConfiguration().m_username.c_str()); + m_output_fs.Create(Cache::GetInstance().RefConfiguration().m_username.c_str(), m_temp_filename.c_str(), 0600, myEnv, XRDOSS_mkpath); + m_output = m_output_fs.newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); if (m_output) { int res = m_output->Open(m_temp_filename.c_str(), O_RDWR, 0600, myEnv); @@ -248,8 +247,8 @@ bool File::Open() // Create the info file std::string ifn = m_temp_filename + Info::m_infoExtension; - m_output_fs.Create(Factory::GetInstance().RefConfiguration().m_username.c_str(), ifn.c_str(), 0600, myEnv, XRDOSS_mkpath); - m_infoFile = m_output_fs.newFile(Factory::GetInstance().RefConfiguration().m_username.c_str()); + m_output_fs.Create(Cache::GetInstance().RefConfiguration().m_username.c_str(), ifn.c_str(), 0600, myEnv, XRDOSS_mkpath); + m_infoFile = m_output_fs.newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); if (m_infoFile) { int res = m_infoFile->Open(ifn.c_str(), O_RDWR, 0600, myEnv); @@ -266,11 +265,11 @@ bool File::Open() return false; } - if (m_cfi.Read(m_infoFile, Factory::GetInstance().RefConfiguration().m_prefetch) <= 0) + if (m_cfi.Read(m_infoFile, Cache::GetInstance().RefConfiguration().m_prefetch) <= 0) { int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1; clLog()->Info(XrdCl::AppMsg, "Creating new file info with size %lld. Reserve space for %d blocks %s", m_fileSize, ss, m_input.Path()); - m_cfi.ResizeBits(ss, Factory::GetInstance().RefConfiguration().m_prefetch); + m_cfi.ResizeBits(ss, Cache::GetInstance().RefConfiguration().m_prefetch); m_cfi.WriteHeader(m_infoFile); } else @@ -350,7 +349,7 @@ Block* File::RequestBlock(int i, bool prefetch) clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() this = %p, b=%p, this idx=%d pOn=(%d) %s", (void*)this, (void*)b, i, prefetch, lPath()); m_block_map[i] = b; - if (m_prefetchState == kOn && m_block_map.size() > Factory::GetInstance().RefConfiguration().m_prefetch_max_blocks) + if (m_prefetchState == kOn && m_block_map.size() > Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) { m_prefetchState = kHold; cache()->DeRegisterPrefetchFile(this); @@ -829,7 +828,7 @@ void File::free_block(Block* b) cache()->RAMBlockReleased(); } - if (m_prefetchState == kHold && m_block_map.size() < Factory::GetInstance().RefConfiguration().m_prefetch_max_blocks) + if (m_prefetchState == kHold && m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) { m_prefetchState = kOn; cache()->RegisterPrefetchFile(this); @@ -956,7 +955,7 @@ void File::Prefetch() //______________________________________________________________________________ void File::CheckPrefetchStatRAM(Block* b) { - if (Factory::GetInstance().RefConfiguration().m_prefetch) { + if (Cache::GetInstance().RefConfiguration().m_prefetch) { if (b->m_prefetch) { m_prefetchHitCnt++; m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt; @@ -967,7 +966,7 @@ void File::CheckPrefetchStatRAM(Block* b) //______________________________________________________________________________ void File::CheckPrefetchStatDisk(int idx) { - if (Factory::GetInstance().RefConfiguration().m_prefetch) { + if (Cache::GetInstance().RefConfiguration().m_prefetch) { if (m_cfi.TestPrefetchBit(idx)) m_prefetchHitCnt++; } diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc index e7728db67e2..ee620601077 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc @@ -25,7 +25,6 @@ #include "XrdFileCacheIOEntireFile.hh" #include "XrdFileCacheStats.hh" -#include "XrdFileCacheFactory.hh" using namespace XrdFileCache; @@ -39,7 +38,7 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache & c clLog()->Info(XrdCl::AppMsg, "IO::IO() [%p] %s", this, m_io.Path()); XrdCl::URL url(io.Path()); - std::string fname = Factory::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); + std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); m_file = new File(io, fname, 0, io.FSize()); } diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc index 1b63bf17c11..ae9b8afe206 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc @@ -25,7 +25,6 @@ #include "XrdFileCacheIOFileBlock.hh" #include "XrdFileCache.hh" #include "XrdFileCacheStats.hh" -#include "XrdFileCacheFactory.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSfs/XrdSfsInterface.hh" @@ -37,7 +36,7 @@ using namespace XrdFileCache; IOFileBlock::IOFileBlock(XrdOucCacheIO &io, XrdOucCacheStats &statsGlobal, Cache & cache) : IO(io, statsGlobal, cache) { - m_blocksize = Factory::GetInstance().RefConfiguration().m_hdfsbsize; + m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize; GetBlockSizeFromPath(); } @@ -87,7 +86,7 @@ void IOFileBlock::GetBlockSizeFromPath() File* IOFileBlock::newBlockFile(long long off, int blocksize, XrdOucCacheIO* io) { XrdCl::URL url(io->Path()); - std::string fname = Factory::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); + std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); std::stringstream ss; ss << fname; diff --git a/src/XrdFileCache/XrdFileCacheVRead.cc b/src/XrdFileCache/XrdFileCacheVRead.cc index cbfdb3ae471..451502ebc01 100644 --- a/src/XrdFileCache/XrdFileCacheVRead.cc +++ b/src/XrdFileCache/XrdFileCacheVRead.cc @@ -1,6 +1,4 @@ - #include "XrdFileCacheFile.hh" -#include "XrdFileCacheFactory.hh" #include "XrdFileCache.hh" #include "XrdFileCacheInfo.hh" @@ -189,7 +187,7 @@ bool File::VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& b clLog()->Debug(XrdCl::AppMsg, "VReadPreProcess block %d , chunk idx = %d on disk", block_idx,iov_idx ); } else { - if ( Factory::GetInstance().GetCache()->HaveFreeWritingSlots() && Factory::GetInstance().GetCache()->RequestRAMBlock()) + if ( Cache::GetInstance().HaveFreeWritingSlots() && Cache::GetInstance().RequestRAMBlock()) { Block *b = RequestBlock(block_idx, false); if (!b) return false;