diff --git a/src/XrdFileCache/XrdFileCache.cc b/src/XrdFileCache/XrdFileCache.cc index 1c69e2f9386..25664c6e32f 100644 --- a/src/XrdFileCache/XrdFileCache.cc +++ b/src/XrdFileCache/XrdFileCache.cc @@ -18,11 +18,13 @@ #include #include +#include #include #include "XrdCl/XrdClConstants.hh" #include "XrdCl/XrdClURL.hh" #include "XrdSys/XrdSysPthread.hh" +#include "XrdSys/XrdSysTimer.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucEnv.hh" @@ -33,6 +35,7 @@ using namespace XrdFileCache; + void *ProcessWriteTaskThread(void* c) { Cache *cache = static_cast(c); @@ -40,14 +43,26 @@ void *ProcessWriteTaskThread(void* c) return NULL; } -Cache::Cache(XrdOucCacheStats & stats) - : m_attached(0), +void *PrefetchThread(void* ptr) +{ + Cache* cache = static_cast(ptr); + cache->Prefetch(); + return NULL; +} +//______________________________________________________________________________ + + +Cache::Cache(XrdOucCacheStats & stats) : XrdOucCache(), m_stats(stats), m_RAMblocks_used(0) { - pthread_t tid; - XrdSysThread::Run(&tid, ProcessWriteTaskThread, (void*)this, 0, "XrdFileCache WriteTasks "); + pthread_t tid1; + XrdSysThread::Run(&tid1, ProcessWriteTaskThread, (void*)this, 0, "XrdFileCache WriteTasks "); + + pthread_t tid2; + XrdSysThread::Run(&tid2, PrefetchThread, (void*)this, 0, "XrdFileCache Prefetch "); } + //______________________________________________________________________________ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) @@ -55,10 +70,6 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) if (Factory::GetInstance().Decide(io)) { clLog()->Info(XrdCl::AppMsg, "Cache::Attach() %s", io->Path()); - { - XrdSysMutexHelper lock(&m_io_mutex); - m_attached++; - } IO* cio; if (Factory::GetInstance().RefConfiguration().m_hdfsmode) cio = new IOFileBlock(*io, m_stats, *this); @@ -78,17 +89,13 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) int Cache::isAttached() { - XrdSysMutexHelper lock(&m_io_mutex); - return m_attached; + // virutal function of XrdOucCache, don't see it used in pfc or posix layer + return true; } void Cache::Detach(XrdOucCacheIO* io) { clLog()->Info(XrdCl::AppMsg, "Cache::Detach() %s", io->Path()); - { - XrdSysMutexHelper lock(&m_io_mutex); - m_attached--; - } delete io; } @@ -167,6 +174,7 @@ Cache::ProcessWriteTasks() block->m_file->WriteBlockToDisk(block); } } + //______________________________________________________________________________ bool @@ -178,7 +186,6 @@ Cache::RequestRAMBlock() m_RAMblocks_used++; return true; } - return false; } @@ -189,3 +196,84 @@ Cache::RAMBlockReleased() m_RAMblocks_used--; } + +//============================================================================== +//======================= PREFETCH =================================== +//============================================================================== + +namespace { +struct prefetch_less_than +{ + inline bool operator() (const File* struct1, const File* struct2) + { + return (struct1->GetPrefetchScore() < struct2->GetPrefetchScore()); + } +}myobject; +} +//______________________________________________________________________________ + +void +Cache::RegisterPrefetchFile(File* file) +{ + // called from File::Open() + + if (Factory::GetInstance().RefConfiguration().m_prefetch) + { + if (Factory::GetInstance().RefConfiguration().m_hdfsmode) { + XrdSysMutexHelper lock(&m_prefetch_mutex); + m_files.push_back(file); + } + else + { + // don't need to lock it becuse it File object is created in constructor of IOEntireFile + m_files.push_back(file); + } + } +} +//______________________________________________________________________________ + +void +Cache::DeRegisterPrefetchFile(File* file) +{ + // called from last line File::InitiateClose() + + XrdSysMutexHelper lock(&m_prefetch_mutex); + for (FileList::iterator it = m_files.begin(); it != m_files.end(); ++it) { + if (*it == file) { + m_files.erase(it); + break; + } + } +} +//______________________________________________________________________________ + +File* +Cache::GetNextFileToPrefetch() +{ + XrdSysMutexHelper lock(&m_prefetch_mutex); + if (m_files.empty()) + return 0; + + std::sort(m_files.begin(), m_files.end(), myobject); + File* f = m_files.back(); + f->MarkPrefetch(); + return f; +} + +//______________________________________________________________________________ + + +void +Cache::Prefetch() +{ + while (true) { + File* f = GetNextFileToPrefetch(); + if (f) { + f->Prefetch(); + } + else { + // wait for new file, AMT should I wait for the signal instead ??? + XrdSysTimer::Wait(10); + } + } +} diff --git a/src/XrdFileCache/XrdFileCache.hh b/src/XrdFileCache/XrdFileCache.hh index 6d3c4e98292..9a2451e9cdd 100644 --- a/src/XrdFileCache/XrdFileCache.hh +++ b/src/XrdFileCache/XrdFileCache.hh @@ -86,6 +86,13 @@ namespace XrdFileCache void RAMBlockReleased(); + void RegisterPrefetchFile(File*); + void DeRegisterPrefetchFile(File*); + + File* GetNextFileToPrefetch(); + + void Prefetch(); + private: //! Decrease attached count. Called from IO::Detach(). void Detach(XrdOucCacheIO *); @@ -96,8 +103,7 @@ namespace XrdFileCache //! Short log alias. XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); } - XrdSysMutex m_io_mutex; //!< central lock for this class - unsigned int m_attached; //!< number of attached IO objects + XrdSysMutex m_prefetch_mutex; //!< central lock for this class XrdOucCacheStats &m_stats; //!< global cache usage statistics XrdSysMutex m_RAMblock_mutex; //!< central lock for this class @@ -113,6 +119,9 @@ namespace XrdFileCache WriteQ s_writeQ; + // prefetching + typedef std::vector FileList; + FileList m_files; }; //---------------------------------------------------------------------------- diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index daab517f0de..1e66f9efb33 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -87,7 +87,8 @@ m_non_flushed_cnt(0), m_in_sync(false), m_downloadCond(0), m_prefetchReadCnt(0), -m_prefetchHitCnt(0) +m_prefetchHitCnt(0), +m_prefetchCurrentCnt(0) { clLog()->Debug(XrdCl::AppMsg, "File::File() %s", m_input.Path()); Open(); @@ -104,14 +105,17 @@ File::~File() { m_stateCond.Lock(); bool isStopped = m_stopping; + bool isPrefetching = (m_prefetchCurrentCnt > 0); m_stateCond.UnLock(); - if (isStopped) + if ((isPrefetching == false) && isStopped) { - printf("~FILE map size %ld \n", m_block_map.size()); - if ( m_block_map.empty()) + m_downloadCond.Lock(); + bool blockMapEmpty = m_block_map.empty(); + m_downloadCond.UnLock(); + if ( blockMapEmpty) break; } - XrdSysTimer::Wait(100); + XrdSysTimer::Wait(10); } clLog()->Debug(XrdCl::AppMsg, "File::~File finished with writing %s",lPath() ); @@ -159,9 +163,12 @@ bool File::InitiateClose() { // Retruns true if delay is needed clLog()->Debug(XrdCl::AppMsg, "File::Initiate close start", lPath()); + + cache()->DeRegisterPrefetchFile(this); + m_stateCond.Lock(); m_stopping = true; - m_stateCond.UnLock(); + m_stateCond.UnLock(); if (m_cfi.IsComplete()) return false; // AMT maybe map size is here more meaningfull, but might hold block state lock return true; } @@ -231,6 +238,8 @@ bool File::Open() clLog()->Debug(XrdCl::AppMsg, "Info file read from disk: %s", m_input.Path()); } + + cache()->RegisterPrefetchFile(this); return true; } @@ -770,33 +779,40 @@ void File::AppendIOStatToFileInfo() //______________________________________________________________________________ void File::Prefetch() { - int block_idx = -1; + bool stopping = false; + m_stateCond.Lock(); + stopping = m_stopping; + m_stateCond.UnLock(); + - XrdSysCondVarHelper _lck(m_downloadCond); - // AMT can this be sorted before calling Prefetch ?? - if (m_cfi.IsComplete()) return; + if (!stopping) { + XrdSysCondVarHelper _lck(m_downloadCond); + if (m_cfi.IsComplete() == false) + { + int block_idx = -1; + // check index not on disk and not in RAM + for (int f = 0; f < m_cfi.GetSizeInBits(); ++f) + { + if (!m_cfi.TestBit(f)) + { + BlockMap_i bi = m_block_map.find(block_idx); + if (bi == m_block_map.end()) { + block_idx = f; + break; + } + } + } - // check index not on disk and not in RAM - for (int f = 0; f < m_cfi.GetSizeInBits(); ++f) - { - if (!m_cfi.TestBit(f)) - { - BlockMap_i bi = m_block_map.find(block_idx); - if (bi == m_block_map.end()) { - block_idx = f; - break; + if (cache()->RequestRAMBlock()) { + m_prefetchReadCnt++; + + Block *b = RequestBlock(block_idx, true); + inc_ref_count(b); } } } - - assert(block_idx >= 0); - - // decrease counter of globally available blocks, resources already checked in global thread - cache()->RequestRAMBlock(); - m_prefetchReadCnt++; - - Block *b = RequestBlock(block_idx, true); - inc_ref_count(b); + + UnMarkPrefetch(); } @@ -819,16 +835,33 @@ void File::CheckPrefetchStatDisk(int idx) } //______________________________________________________________________________ -float File::GetPrefetchScore() +float File::GetPrefetchScore() const { if (m_prefetchReadCnt) return m_prefetchHitCnt/m_prefetchReadCnt; - return 0; + return 1; // AMT not sure if this should be 0.5 ... ???? } -//============================================================================== +//______________________________________________________________________________ +void File::MarkPrefetch() +{ + m_stateCond.Lock(); + m_prefetchCurrentCnt++; + m_stateCond.UnLock(); + +} +//______________________________________________________________________________ +void File::UnMarkPrefetch() +{ + m_stateCond.Lock(); + m_prefetchCurrentCnt--; + m_stateCond.UnLock(); +} + +//============================================================================== +//================== RESPONSE HANDLER ================================== //============================================================================== void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, @@ -865,9 +898,3 @@ void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, } - -//============================================================================== - -//============================================================================== - -//============================================================================== diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index a549a73c301..8656db34a30 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -126,6 +126,8 @@ namespace XrdFileCache int m_prefetchReadCnt; int m_prefetchHitCnt; + int m_prefetchCurrentCnt; + // AMT should I cache prefetch score for optimization of Cache::getNextFileToPrefetch() ??? public: @@ -170,7 +172,9 @@ namespace XrdFileCache void Prefetch(); - float GetPrefetchScore(); + float GetPrefetchScore() const; + + void MarkPrefetch(); private: Block* RequestBlock(int i, bool prefetch); @@ -187,6 +191,8 @@ namespace XrdFileCache void CheckPrefetchStatRAM(Block* b); void CheckPrefetchStatDisk(int idx); + void UnMarkPrefetch(); + //! Short log alias. XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }