From 98519036e45e165a9265fb5a89a62e45652c275a Mon Sep 17 00:00:00 2001 From: Matevz Tadel Date: Wed, 11 Dec 2019 00:20:21 -0800 Subject: [PATCH 1/2] [pfc] g-stream file_close & write queue mem handling - Implement pfc g-stream file_close reporting. - Align buffers to page size. - Reuse 5% of ram of buffers. - Collect statistics of ram / write queue usage in bytes. --- src/XrdPfc/XrdPfc.cc | 249 +++++++++++++++++++++--------- src/XrdPfc/XrdPfc.hh | 60 ++++--- src/XrdPfc/XrdPfcConfiguration.cc | 22 ++- src/XrdPfc/XrdPfcFile.cc | 232 +++++++++++++++------------- src/XrdPfc/XrdPfcFile.hh | 53 ++++--- src/XrdPfc/XrdPfcIOFileBlock.cc | 30 ++-- src/XrdPfc/XrdPfcIOFileBlock.hh | 2 +- src/XrdPfc/XrdPfcInfo.cc | 20 ++- src/XrdPfc/XrdPfcInfo.hh | 27 ++-- src/XrdPfc/XrdPfcPurge.cc | 146 ++++++++++++++++-- src/XrdPfc/XrdPfcStats.hh | 2 +- src/XrdPfc/XrdPfcVRead.cc | 25 +-- 12 files changed, 577 insertions(+), 291 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index c28f72cf79c..cb098cc52c9 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -23,13 +23,18 @@ #include "XrdCl/XrdClConstants.hh" #include "XrdCl/XrdClURL.hh" -#include "XrdSys/XrdSysPthread.hh" -#include "XrdSys/XrdSysTimer.hh" -#include "XrdOss/XrdOss.hh" + #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucUtils.hh" + +#include "XrdSys/XrdSysPthread.hh" +#include "XrdSys/XrdSysTimer.hh" #include "XrdSys/XrdSysTrace.hh" +#include "XrdXrootd/XrdXrootdGStream.hh" + +#include "XrdOss/XrdOss.hh" + #include "XrdPfc.hh" #include "XrdPfcTrace.hh" #include "XrdPfcInfo.hh" @@ -38,29 +43,33 @@ using namespace XrdPfc; -Cache * Cache::m_factory = NULL; +Cache * Cache::m_instance = 0; -XrdScheduler *Cache::schedP = NULL; +XrdScheduler *Cache::schedP = 0; -void *PurgeThread(void* cache_void) +void *ResourceMonitorHeartBeatThread(void*) +{ + Cache::GetInstance().ResourceMonitorHeartBeat(); + return 0; +} + +void *PurgeThread(void*) { Cache::GetInstance().Purge(); - return NULL; + return 0; } -void *ProcessWriteTaskThread(void* c) +void *ProcessWriteTaskThread(void*) { - Cache *cache = static_cast(c); - cache->ProcessWriteTasks(); - return NULL; + Cache::GetInstance().ProcessWriteTasks(); + return 0; } -void *PrefetchThread(void* ptr) +void *PrefetchThread(void*) { - Cache* cache = static_cast(ptr); - cache->Prefetch(); - return NULL; + Cache::GetInstance().Prefetch(); + return 0; } //============================================================================== @@ -70,43 +79,46 @@ extern "C" XrdOucCache *XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, - XrdOucEnv *envP) + XrdOucEnv *env) { XrdSysError err(logger, ""); err.Say("++++++ Proxy file cache initialization started."); - if ( ! envP || - ! (XrdPfc::Cache::schedP = (XrdScheduler*) envP->GetPtr("XrdScheduler*"))) + if ( ! env || + ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*"))) { XrdPfc::Cache::schedP = new XrdScheduler; XrdPfc::Cache::schedP->Start(); } - Cache &factory = Cache::CreateInstance(logger); + Cache &instance = Cache::CreateInstance(logger, env); - if (! factory.Config(config_filename, parameters)) + if (! instance.Config(config_filename, parameters)) { err.Say("Config Proxy file cache initialization failed."); - return NULL; + return 0; } err.Say("------ Proxy file cache initialization completed."); - for (int wti = 0; wti < factory.RefConfiguration().m_wqueue_threads; ++wti) { - pthread_t tid1; - XrdSysThread::Run(&tid1, ProcessWriteTaskThread, (void*)(&factory), 0, "XrdPfc WriteTasks "); - } + pthread_t tid; - if (factory.RefConfiguration().m_prefetch_max_blocks > 0) - { - pthread_t tid2; - XrdSysThread::Run(&tid2, PrefetchThread, (void*)(&factory), 0, "XrdPfc Prefetch "); - } + for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti) + { + XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks "); + } + + if (instance.RefConfiguration().m_prefetch_max_blocks > 0) + { + XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch "); + } - pthread_t tid; - XrdSysThread::Run(&tid, PurgeThread, NULL, 0, "XrdPfc Purge"); + XrdSysThread::Run(&tid, ResourceMonitorHeartBeatThread, 0, 0, "XrdPfc ResourceMonitorHeartBeat"); - return &factory; + XrdSysThread::Run(&tid, PurgeThread, 0, 0, "XrdPfc Purge"); + } + + return &instance; } } @@ -133,17 +145,17 @@ void Configuration::calculate_fractional_usages(long long du, long long f //============================================================================== -Cache &Cache::CreateInstance(XrdSysLogger *logger) +Cache &Cache::CreateInstance(XrdSysLogger *logger, XrdOucEnv *env) { - assert (m_factory == NULL); - m_factory = new Cache(logger); - return *m_factory; + assert (m_instance == 0); + m_instance = new Cache(logger, env); + return *m_instance; } Cache &Cache::GetInstance() { - assert (m_factory != NULL); - return *m_factory; + assert (m_instance != 0); + return *m_instance; } bool Cache::Decide(XrdOucCacheIO* io) @@ -157,7 +169,7 @@ bool Cache::Decide(XrdOucCacheIO* io) { XrdPfc::Decision *d = *it; if (! d) continue; - if (! d->Decide(filename, *m_output_fs)) + if (! d->Decide(filename, *m_oss)) { return false; } @@ -167,22 +179,29 @@ bool Cache::Decide(XrdOucCacheIO* io) return true; } -Cache::Cache(XrdSysLogger *logger) : +Cache::Cache(XrdSysLogger *logger, XrdOucEnv *env) : XrdOucCache("pfc"), + m_env(env), m_log(logger, "XrdPfc_"), m_trace(new XrdSysTrace("XrdPfc", logger)), m_traceID("Manager"), + m_oss(0), + m_gstream(0), m_prefetch_condVar(0), - m_RAMblocks_used(0), + m_prefetch_enabled(false), + m_RAM_used(0), + m_RAM_write_queue(0), m_isClient(false), m_in_purge(false), m_active_cond(0), - m_fs_state(0) + m_stats_n_purge_cond(0), + m_fs_state(0), + m_last_scan_duration(0), + m_last_purge_duration(0), + m_spt_state(SPTS_Idle) { // Default log level is Warning. m_trace->What = 2; - - m_prefetch_enabled = (m_configuration.m_prefetch_max_blocks > 0); } XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) @@ -231,6 +250,11 @@ void Cache::AddWriteTask(Block* b, bool fromRead) { TRACE(Dump, "Cache::AddWriteTask() bOff=%ld " << b->m_offset); + { + XrdSysMutexHelper lock(&m_RAM_mutex); + m_RAM_write_queue += b->get_size(); + } + m_writeQ.condVar.Lock(); if (fromRead) m_writeQ.queue.push_back(b); @@ -241,19 +265,21 @@ void Cache::AddWriteTask(Block* b, bool fromRead) m_writeQ.condVar.UnLock(); } -void Cache::RemoveWriteQEntriesFor(File *iFile) +void Cache::RemoveWriteQEntriesFor(File *file) { std::list removed_blocks; + long long sum_size = 0; m_writeQ.condVar.Lock(); std::list::iterator i = m_writeQ.queue.begin(); while (i != m_writeQ.queue.end()) { - if ((*i)->m_file == iFile) + if ((*i)->m_file == file) { - TRACE(Dump, "Cache::Remove entries for " << (void*)(*i) << " path " << iFile->lPath()); + TRACE(Dump, "Cache::Remove entries for " << (void*)(*i) << " path " << file->lPath()); std::list::iterator j = i++; removed_blocks.push_back(*j); + sum_size += (*j)->get_size(); m_writeQ.queue.erase(j); --m_writeQ.size; } @@ -264,7 +290,12 @@ void Cache::RemoveWriteQEntriesFor(File *iFile) } m_writeQ.condVar.UnLock(); - iFile->BlocksRemovedFromWriteQ(removed_blocks); + { + XrdSysMutexHelper lock(&m_RAM_mutex); + m_RAM_write_queue -= sum_size; + } + + file->BlocksRemovedFromWriteQ(removed_blocks); } void Cache::ProcessWriteTasks() @@ -282,13 +313,15 @@ void Cache::ProcessWriteTasks() // MT -- optimize to pop several blocks if they are available (or swap the list). // This makes sense especially for smallish block sizes. - int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks); + int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks); + long long sum_size = 0; for (int bi = 0; bi < n_pushed; ++bi) { Block* block = m_writeQ.queue.front(); m_writeQ.queue.pop_front(); m_writeQ.writes_between_purges += block->get_size(); + sum_size += block->get_size(); blks_to_write[bi] = block; @@ -298,6 +331,11 @@ void Cache::ProcessWriteTasks() m_writeQ.condVar.UnLock(); + { + XrdSysMutexHelper lock(&m_RAM_mutex); + m_RAM_write_queue -= sum_size; + } + for (int bi = 0; bi < n_pushed; ++bi) { Block* block = blks_to_write[bi]; @@ -307,21 +345,59 @@ void Cache::ProcessWriteTasks() } } -bool Cache::RequestRAMBlock() +//============================================================================== + +char* Cache::RequestRAM(long long size) { - XrdSysMutexHelper lock(&m_RAMblock_mutex); - if ( m_RAMblocks_used < Cache::GetInstance().RefConfiguration().m_NRamBuffers ) + static const size_t s_block_align = sysconf(_SC_PAGESIZE); + + bool std_size = (size == m_configuration.m_bufferSize); + + m_RAM_mutex.Lock(); + + long long total = m_RAM_used + size; + + if (total <= m_configuration.m_RamAbsAvailable) { - m_RAMblocks_used++; - return true; + m_RAM_used = total; + if (std_size && m_RAM_std_size > 0) + { + char *buf = m_RAM_std_blocks.back(); + m_RAM_std_blocks.pop_back(); + --m_RAM_std_size; + + m_RAM_mutex.UnLock(); + + return buf; + } + else + { + m_RAM_mutex.UnLock(); + char *buf = 0; + posix_memalign((void**) &buf, s_block_align, (size_t) size); + return buf; + } } - return false; + m_RAM_mutex.UnLock(); + return 0; } -void Cache::RAMBlockReleased() +void Cache::ReleaseRAM(char* buf, long long size) { - XrdSysMutexHelper lock(&m_RAMblock_mutex); - m_RAMblocks_used--; + bool std_size = (size == m_configuration.m_bufferSize); + { + XrdSysMutexHelper lock(&m_RAM_mutex); + + m_RAM_used -= size; + + if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks) + { + m_RAM_std_blocks.push_back(buf); + ++m_RAM_std_size; + return; + } + } + free(buf); } File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize) @@ -557,6 +633,31 @@ void Cache::dec_ref_cnt(File* f, bool high_debug) m_closed_files_stats.insert(std::make_pair(f->GetLocalPath(), f->DeltaStatsFromLastCall())); } + if (m_gstream) + { + const Info::AStat *as = f->GetLastAccessStats(); + + char buf[4096]; + int len = snprintf(buf, 4096, "{\"event\":\"file_close\"," + "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d," + "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld," + "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld}", + f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(), + f->GetNBlocks(), f->GetNDownloadedBlocks(), + f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime, + as->BytesHit, as->BytesMissed, as->BytesBypassed + ); + bool suc = false; + if (len < 4096) + { + suc = m_gstream->Insert(buf, len + 1); + } + if ( ! suc) + { + TRACE(Error, "Failed g-stream insertion of file_close record."); + } + } + delete f; } } @@ -634,13 +735,13 @@ File* Cache::GetNextFileToPrefetch() void Cache::Prefetch() { - const int limitRAM = int( Cache::GetInstance().RefConfiguration().m_NRamBuffers * 0.7 ); + const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10; while (true) { - m_RAMblock_mutex.Lock(); - bool doPrefetch = (m_RAMblocks_used < limitRAM); - m_RAMblock_mutex.UnLock(); + m_RAM_mutex.Lock(); + bool doPrefetch = (m_RAM_used < limit_RAM); + m_RAM_mutex.UnLock(); if (doPrefetch) { @@ -688,7 +789,7 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, if (why == ForPath) { - return m_output_fs->Lfn2Pfn(f_name.c_str(), buff, blen); + return m_oss->Lfn2Pfn(f_name.c_str(), buff, blen); } { @@ -697,8 +798,8 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, } struct stat sbuff, sbuff2; - if (m_output_fs->Stat(f_name.c_str(), &sbuff) == XrdOssOK && - m_output_fs->Stat(i_name.c_str(), &sbuff2) == XrdOssOK) + if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK && + m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK) { if ( S_ISDIR(sbuff.st_mode)) { @@ -721,7 +822,7 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, if (is_active) m_active_cond.UnLock(); - XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str()); + XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str()); XrdOucEnv myEnv; int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv); if (res >= 0) @@ -750,7 +851,7 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, { if ((is_complete || why == ForInfo) && buff != 0) { - int res2 = m_output_fs->Lfn2Pfn(f_name.c_str(), buff, blen); + int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen); if (res2 < 0) return res2; @@ -759,7 +860,7 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, if (why == ForAccess) {mode_t mode = (forall ? worldReadable : groupReadable); if (((sbuff.st_mode & worldReadable) != mode) - && (m_output_fs->Chmod(f_name.c_str(),mode) != XrdOssOK)) + && (m_oss->Chmod(f_name.c_str(),mode) != XrdOssOK)) {is_complete = false; *buff = 0; } @@ -817,7 +918,7 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode) } struct stat sbuff; - int res = m_output_fs->Stat(i_name.c_str(), &sbuff); + int res = m_oss->Stat(i_name.c_str(), &sbuff); if (res == 0) { TRACE(Dump, "Cache::Prepare defer open " << f_name); @@ -848,7 +949,7 @@ int Cache::Stat(const char *curl, struct stat &sbuff) m_purge_delay_set.insert(f_name); } - if (m_output_fs->Stat(f_name.c_str(), &sbuff) == XrdOssOK) + if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK) { if (S_ISDIR(sbuff.st_mode)) { @@ -857,7 +958,7 @@ int Cache::Stat(const char *curl, struct stat &sbuff) else { bool success = false; - XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str()); + XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str()); XrdOucEnv myEnv; int res = infoFile->Open(i_name.c_str(), O_RDONLY, 0600, myEnv); if (res >= 0) @@ -944,8 +1045,8 @@ int Cache::UnlinkCommon(const std::string& f_name, bool fail_if_open) std::string i_name = f_name + Info::s_infoExtension; // Unlink file & cinfo - int f_ret = m_output_fs->Unlink(f_name.c_str()); - int i_ret = m_output_fs->Unlink(i_name.c_str()); + int f_ret = m_oss->Unlink(f_name.c_str()); + int i_ret = m_oss->Unlink(i_name.c_str()); TRACE(Debug, "Cache::UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret); diff --git a/src/XrdPfc/XrdPfc.hh b/src/XrdPfc/XrdPfc.hh index b3c8573b014..572ad621a72 100644 --- a/src/XrdPfc/XrdPfc.hh +++ b/src/XrdPfc/XrdPfc.hh @@ -35,6 +35,7 @@ class XrdOucStream; class XrdSysError; class XrdSysTrace; +class XrdXrootdGStream; namespace XrdCl { @@ -78,7 +79,7 @@ struct Configuration m_dirStats(false), m_bufferSize(1024*1024), m_RamAbsAvailable(0), - m_NRamBuffers(-1), + m_RamKeepStdBlocks(0), m_wqueue_blocks(16), m_wqueue_threads(4), m_prefetch_max_blocks(10), @@ -121,7 +122,7 @@ struct Configuration long long m_bufferSize; //!< prefetch buffer size, default 1MB long long m_RamAbsAvailable; //!< available from configuration - int m_NRamBuffers; //!< number of total in-memory cache blocks, cached + int m_RamKeepStdBlocks; //!< number of standard-sized blocks kept after release int m_wqueue_blocks; //!< maximum number of blocks written per write-queue loop int m_wqueue_threads; //!< number of threads writing blocks to disk int m_prefetch_max_blocks; //!< maximum number of blocks to prefetch per file @@ -282,7 +283,7 @@ public: //--------------------------------------------------------------------- //! Constructor //--------------------------------------------------------------------- - Cache(XrdSysLogger *logger); + Cache(XrdSysLogger *logger, XrdOucEnv *env); //--------------------------------------------------------------------- //! Obtain a new IO object that fronts existing XrdOucCacheIO. @@ -334,7 +335,7 @@ public: //--------------------------------------------------------------------- //! Singleton creation. //--------------------------------------------------------------------- - static Cache &CreateInstance(XrdSysLogger *logger); + static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env); //--------------------------------------------------------------------- //! Singleton access. @@ -347,7 +348,12 @@ public: static bool VCheck(XrdVersionInfo &urVersion) { return true; } //--------------------------------------------------------------------- - //! Thread function running disk cache purge periodically. + //! Thread function checking resource usage periodically. + //--------------------------------------------------------------------- + void ResourceMonitorHeartBeat(); + + //--------------------------------------------------------------------- + //! Thread function invoked to scan and purge files from disk when needed. //--------------------------------------------------------------------- void Purge(); @@ -372,9 +378,8 @@ public: //--------------------------------------------------------------------- void ProcessWriteTasks(); - bool RequestRAMBlock(); - - void RAMBlockReleased(); + char* RequestRAM(long long size); + void ReleaseRAM(char* buf, long long size); void RegisterPrefetchFile(File*); void DeRegisterPrefetchFile(File*); @@ -383,7 +388,7 @@ public: void Prefetch(); - XrdOss* GetOss() const { return m_output_fs; } + XrdOss* GetOss() const { return m_oss; } bool IsFileActiveOrPurgeProtected(const std::string&); @@ -398,6 +403,8 @@ public: XrdSysError* GetLog() { return &m_log; } XrdSysTrace* GetTrace() { return m_trace; } + XrdXrootdGStream* GetGStream() { return m_gstream; } + void ExecuteCommandUrl(const std::string& command_url); static XrdScheduler *schedP; @@ -412,26 +419,31 @@ private: int UnlinkCommon(const std::string& f_name, bool fail_if_open); - static Cache *m_factory; //!< this object + static Cache *m_instance; //!< this object + XrdOucEnv *m_env; //!< environment passed in at creation XrdSysError m_log; //!< XrdPfc namespace logger XrdSysTrace *m_trace; const char *m_traceID; XrdOucCacheStats m_ouc_stats; //!< - XrdOss *m_output_fs; //!< disk cache file system + XrdOss *m_oss; //!< disk cache file system - std::vector m_decisionpoints; //!< decision plugins + XrdXrootdGStream *m_gstream; - std::map m_filesInQueue; + std::vector m_decisionpoints; //!< decision plugins Configuration m_configuration; //!< configurable parameters XrdSysCondVar m_prefetch_condVar; //!< lock for vector of prefetching files bool m_prefetch_enabled; //!< set to true when prefetching is enabled - XrdSysMutex m_RAMblock_mutex; //!< lock for allcoation of RAM blocks - int m_RAMblocks_used; + XrdSysMutex m_RAM_mutex; //!< lock for allcoation of RAM blocks + long long m_RAM_used; + long long m_RAM_write_queue; + std::list m_RAM_std_blocks; //!< A list of blocks of standard size, to be reused. + int m_RAM_std_size; + bool m_isClient; //!< True if running as client struct WriteQ @@ -453,11 +465,11 @@ private: typedef StatsMMap_t::iterator StatsMMap_i; typedef std::set FNameSet_t; - ActiveMap_t m_active; + ActiveMap_t m_active; //!< Map of currently active / open files. StatsMMap_t m_closed_files_stats; FNameSet_t m_purge_delay_set; bool m_in_purge; - XrdSysCondVar m_active_cond; + XrdSysCondVar m_active_cond; //!< Cond-var protecting active file data structures. void inc_ref_cnt(File*, bool lock, bool high_debug); void dec_ref_cnt(File*, bool high_debug); @@ -468,8 +480,18 @@ private: typedef std::vector PrefetchList; PrefetchList m_prefetchList; - // directory state for access / usage info and quotas - DataFsState *m_fs_state; + //--------------------------------------------------------------------------- + // Statistics, heart-beat, scan-and-purge + + enum ScanAndPurgeThreadState_e { SPTS_Idle, SPTS_Scan, SPTS_Purge, SPTS_Done }; + + XrdSysCondVar m_stats_n_purge_cond; //!< communication between heart-beat and scan-purge threads + + DataFsState *m_fs_state; //!< directory state for access / usage info and quotas + + int m_last_scan_duration; + int m_last_purge_duration; + ScanAndPurgeThreadState_e m_spt_state; void copy_out_active_stats_and_update_data_fs_state(); }; diff --git a/src/XrdPfc/XrdPfcConfiguration.cc b/src/XrdPfc/XrdPfcConfiguration.cc index 3af8dd0895a..a98a34af906 100644 --- a/src/XrdPfc/XrdPfcConfiguration.cc +++ b/src/XrdPfc/XrdPfcConfiguration.cc @@ -227,19 +227,19 @@ bool Cache::Config(const char *config_filename, const char *parameters) // Load OSS plugin. if (ofsCfg->Load(XrdOfsConfigPI::theOssLib)) { - ofsCfg->Plugin(m_output_fs); + ofsCfg->Plugin(m_oss); } else { TRACE(Error, "Cache::Config() Unable to create an OSS object"); - m_output_fs = 0; + m_oss = 0; return false; } // sets default value for disk usage XrdOssVSInfo sP; { - if (m_output_fs->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0) + if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0) { m_log.Emsg("Cache::ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str()); return false; @@ -312,7 +312,8 @@ bool Cache::Config(const char *config_filename, const char *parameters) snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g"); m_log.Say("Config info: ", buff); } - m_configuration.m_NRamBuffers = static_cast(m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize); + // Setup number of standard-size blocks not released back to the system to 5% of total RAM. + m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100; // Set tracing to debug if this is set in environment @@ -383,12 +384,17 @@ bool Cache::Config(const char *config_filename, const char *parameters) m_log.Say(buff); } - m_log.Say("------ File Caching Proxy interface initialization ", retval ? "completed" : "failed"); + // Derived settings + m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0; + Info::s_maxNumAccess = m_configuration.m_accHistorySize; - if (ofsCfg) delete ofsCfg; + m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*"); - // Broadcast settings as needed: - Info::s_maxNumAccess = m_configuration.m_accHistorySize; + m_log.Say("Config Proxy File Cache g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive"); + + m_log.Say("------ Proxy File Cache configuration parsing ", retval ? "completed" : "failed"); + + if (ofsCfg) delete ofsCfg; return retval; } diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index 0eddb300586..42b92d441c1 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -54,44 +54,44 @@ File::File(const std::string& path, long long iOffset, long long iFileSize) : m_ref_cnt(0), m_is_open(false), m_in_shutdown(false), - m_output(0), - m_infoFile(0), + m_data_file(0), + m_info_file(0), m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0), m_filename(path), m_offset(iOffset), - m_fileSize(iFileSize), + m_file_size(iFileSize), m_current_io(m_io_map.end()), m_ios_in_detach(0), m_non_flushed_cnt(0), m_in_sync(false), - m_downloadCond(0), - m_prefetchState(kOff), - m_prefetchReadCnt(0), - m_prefetchHitCnt(0), - m_prefetchScore(1), - m_detachTimeIsLogged(false) + m_state_cond(0), + m_prefetch_state(kOff), + m_prefetch_read_cnt(0), + m_prefetch_hit_cnt(0), + m_prefetch_score(1), + m_detach_time_logged(false) { } File::~File() { - if (m_infoFile) + if (m_info_file) { TRACEF(Debug, "File::~File() close info "); - m_infoFile->Close(); - delete m_infoFile; - m_infoFile = NULL; + m_info_file->Close(); + delete m_info_file; + m_info_file = NULL; } - if (m_output) + if (m_data_file) { TRACEF(Debug, "File::~File() close output "); - m_output->Close(); - delete m_output; - m_output = NULL; + m_data_file->Close(); + delete m_data_file; + m_data_file = NULL; } - TRACEF(Debug, "File::~File() ended, prefetch score = " << m_prefetchScore); + TRACEF(Debug, "File::~File() ended, prefetch score = " << m_prefetch_score); } //------------------------------------------------------------------------------ @@ -123,13 +123,13 @@ void File::initiate_emergency_shutdown() // it happens. { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); m_in_shutdown = true; - if (m_prefetchState != kStopped && m_prefetchState != kComplete) + if (m_prefetch_state != kStopped && m_prefetch_state != kComplete) { - m_prefetchState = kStopped; + m_prefetch_state = kStopped; cache()->DeRegisterPrefetchFile(this); } } @@ -157,7 +157,7 @@ void File::BlockRemovedFromWriteQ(Block* b) { TRACEF(Dump, "File::BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_cfi.GetBufferSize()); - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); dec_ref_count(b); } @@ -165,7 +165,7 @@ void File::BlocksRemovedFromWriteQ(std::list& blocks) { TRACEF(Dump, "File::BlocksRemovedFromWriteQ() n_blocks = " << blocks.size()); - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); for (std::list::iterator i = blocks.begin(); i != blocks.end(); ++i) { @@ -182,7 +182,7 @@ bool File::ioActive(IO *io) TRACEF(Debug, "File::ioActive start for io " << io); { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); if ( ! m_is_open) { @@ -203,15 +203,14 @@ bool File::ioActive(IO *io) "\tio_map.size() " << m_io_map.size() << ", block_map.size() " << m_block_map.size() << ", file"); - // XXXX - // Intermediate check for 4.11 - 5.0 transition. - // Can be removed for 5.1, including the IODetals::m_ioactive_false_reported. + // XXX Intermediate check for 4.11 - 5.0 transition. + // Can be removed for 5.1, including member IODetals::m_ioactive_false_reported. assert( ! mi->second.m_ioactive_false_reported && "ioActive already returned false"); mi->second.m_allow_prefetching = false; // Check if any IO is still available for prfetching. If not, stop it. - if (m_prefetchState == kOn || m_prefetchState == kHold) + if (m_prefetch_state == kOn || m_prefetch_state == kHold) { if ( ! select_current_io_or_disable_prefetching(false) ) { @@ -255,8 +254,8 @@ bool File::ioActive(IO *io) void File::RequestSyncOfDetachStats() { - XrdSysCondVarHelper _lck(m_downloadCond); - m_detachTimeIsLogged = false; + XrdSysCondVarHelper _lck(m_state_cond); + m_detach_time_logged = false; } bool File::FinalizeSyncBeforeExit() @@ -264,14 +263,14 @@ bool File::FinalizeSyncBeforeExit() // Returns true if sync is required. // This method is called after corresponding IO is detached from PosixCache. - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); if (m_is_open && ! m_in_shutdown) { - if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detachTimeIsLogged) + if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged) { Stats loc_stats = m_stats.Clone(); m_cfi.WriteIOStatDetach(loc_stats); - m_detachTimeIsLogged = true; + m_detach_time_logged = true; m_in_sync = true; TRACEF(Debug, "File::FinalizeSyncBeforeExit requesting sync to write detach stats"); return true; @@ -291,7 +290,7 @@ void File::AddIO(IO *io) time_t now = time(0); - m_downloadCond.Lock(); + m_state_cond.Lock(); IoMap_i mi = m_io_map.find(io); @@ -300,9 +299,9 @@ void File::AddIO(IO *io) m_io_map.insert(std::make_pair(io, IODetails(now))); m_stats.IoAttach(); - if (m_prefetchState == kStopped) + if (m_prefetch_state == kStopped) { - m_prefetchState = kOn; + m_prefetch_state = kOn; cache()->RegisterPrefetchFile(this); } } @@ -311,7 +310,7 @@ void File::AddIO(IO *io) TRACEF(Error, "File::AddIO() io = " << (void*)io << " already registered."); } - m_downloadCond.UnLock(); + m_state_cond.UnLock(); } //------------------------------------------------------------------------------ @@ -324,7 +323,7 @@ void File::RemoveIO(IO *io) time_t now = time(0); - m_downloadCond.Lock(); + m_state_cond.Lock(); IoMap_i mi = m_io_map.find(io); @@ -339,10 +338,10 @@ void File::RemoveIO(IO *io) m_io_map.erase(mi); --m_ios_in_detach; - if (m_io_map.empty() && m_prefetchState != kStopped && m_prefetchState != kComplete) + if (m_io_map.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete) { TRACEF(Error, "File::RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now."); - m_prefetchState = kStopped; + m_prefetch_state = kStopped; cache()->DeRegisterPrefetchFile(this); } } @@ -351,7 +350,7 @@ void File::RemoveIO(IO *io) TRACEF(Error, "File::RemoveIO() io = " << (void*)io << " is NOT registered."); } - m_downloadCond.UnLock(); + m_state_cond.UnLock(); } //------------------------------------------------------------------------------ @@ -381,7 +380,7 @@ bool File::Open() bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK); // Create the data file itself. - char size_str[32]; sprintf(size_str, "%lld", m_fileSize); + char size_str[32]; sprintf(size_str, "%lld", m_file_size); myEnv.Put("oss.asize", size_str); myEnv.Put("oss.cgroup", conf.m_data_space.c_str()); @@ -394,12 +393,12 @@ bool File::Open() return false; } - m_output = myOss.newFile(myUser); - if ((res = m_output->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK) + m_data_file = myOss.newFile(myUser); + if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK) { TRACEF(Error, "File::Open() Open failed " << ERRNO_AND_ERRSTR(-res)); errno = -res; - delete m_output; m_output = 0; + delete m_data_file; m_data_file = 0; return false; } @@ -409,23 +408,23 @@ bool File::Open() { TRACE(Error, "File::Open() Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res)); errno = -res; - m_output->Close(); delete m_output; m_output = 0; + m_data_file->Close(); delete m_data_file; m_data_file = 0; return false; } - m_infoFile = myOss.newFile(myUser); - if ((res = m_infoFile->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK) + m_info_file = myOss.newFile(myUser); + if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK) { TRACEF(Error, "File::Open() Open failed for info file " << ifn << ERRNO_AND_ERRSTR(-res)); errno = -res; - delete m_infoFile; m_infoFile = 0; - m_output->Close(); delete m_output; m_output = 0; + delete m_info_file; m_info_file = 0; + m_data_file->Close(); delete m_data_file; m_data_file = 0; return false; } bool initialize_info_file = true; - if (info_existed && m_cfi.Read(m_infoFile, ifn)) + if (info_existed && m_cfi.Read(m_info_file, ifn)) { TRACEF(Debug, "Open - reading existing info file. (data_existed=" << data_existed << ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) << @@ -445,18 +444,18 @@ bool File::Open() if (initialize_info_file) { m_cfi.SetBufferSize(conf.m_bufferSize); - m_cfi.SetFileSize(m_fileSize); - m_cfi.Write(m_infoFile); - m_infoFile->Fsync(); - int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1; - TRACEF(Debug, "Creating new file info, data size = " << m_fileSize << " num blocks = " << ss); + m_cfi.SetFileSize(m_file_size); + m_cfi.Write(m_info_file); + m_info_file->Fsync(); + int ss = (m_file_size - 1)/m_cfi.GetBufferSize() + 1; + TRACEF(Debug, "Creating new file info, data size = " << m_file_size << " num blocks = " << ss); } m_cfi.WriteIOStatAttach(); - m_downloadCond.Lock(); + m_state_cond.Lock(); m_is_open = true; - m_prefetchState = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO(). - m_downloadCond.UnLock(); + m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO(). + m_state_cond.UnLock(); return true; } @@ -511,23 +510,31 @@ Block* File::PrepareBlockRequest(int i, IO *io, bool prefetch) const int last_block = m_cfi.GetSizeInBits() - 1; long long off = i * BS; - long long this_bs = (i == last_block) ? m_fileSize - off : BS; + long long this_bs = (i == last_block) ? m_file_size - off : BS; - // 1. Should blocks be reused to avoid recreation? There is block pool in Xrd - // 2, Memalign to page size - Block *b = new (std::nothrow) Block(this, io, off, this_bs, prefetch); + Block *b = 0; + char *buf = cache()->RequestRAM(this_bs); - if (b) + if (buf) { - m_block_map[i] = b; + b = new (std::nothrow) Block(this, io, buf, off, this_bs, prefetch); - // Actual Read request is issued in ProcessBlockRequests(). - TRACEF(Dump, "File::PrepareBlockRequest() " << i << " prefetch " << prefetch << " address " << (void*) b); + if (b) + { + m_block_map[i] = b; + + // Actual Read request is issued in ProcessBlockRequests(). + TRACEF(Dump, "File::PrepareBlockRequest() " << i << " prefetch " << prefetch << " address " << (void*) b); - if (m_prefetchState == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) + if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) + { + m_prefetch_state = kHold; + cache()->DeRegisterPrefetchFile(this); + } + } + else { - m_prefetchState = kHold; - cache()->DeRegisterPrefetchFile(this); + TRACEF(Dump, "File::PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed."); } } @@ -604,7 +611,7 @@ int File::ReadBlocksFromDisk(std::list& blocks, overlap(*ii, BS, req_off, req_size, off, blk_off, size); - long long rs = m_output->Read(req_buf + off, *ii * BS + blk_off -m_offset, size); + long long rs = m_data_file->Read(req_buf + off, *ii * BS + blk_off -m_offset, size); TRACEF(Dump, "File::ReadBlocksFromDisk block idx = " << *ii << " size= " << size); if (rs < 0) @@ -651,17 +658,17 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) // assess if passing the req to client is actually better. // unlock - m_downloadCond.Lock(); + m_state_cond.Lock(); if ( ! m_is_open) { - m_downloadCond.UnLock(); + m_state_cond.UnLock(); TRACEF(Error, "File::Read file is not open"); return io->GetInput()->Read(iUserBuff, iUserOff, iUserSize); } if (m_in_shutdown) { - m_downloadCond.UnLock(); + m_state_cond.UnLock(); return -ENOENT; } @@ -687,8 +694,8 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) else { // Is there room for one more RAM Block? - Block *b; - if (cache()->RequestRAMBlock() && (b = PrepareBlockRequest(block_idx, io, false)) != 0) + Block *b = PrepareBlockRequest(block_idx, io, false); + if (b) { TRACEF(Dump, "File::Read() inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx); inc_ref_count(b); @@ -705,7 +712,7 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) } } - m_downloadCond.UnLock(); + m_state_cond.UnLock(); ProcessBlockRequests(blks_to_request, false); @@ -750,7 +757,7 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) BlockList_t finished; BlockList_t to_reissue; { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); BlockList_i bi = blks_to_process.begin(); while (bi != blks_to_process.end()) @@ -779,7 +786,7 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) if (finished.empty() && to_reissue.empty()) { - m_downloadCond.Wait(); + m_state_cond.Wait(); continue; } } @@ -861,7 +868,7 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) // Last, stamp and release blocks, release file. { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); // blks_to_process can be non-empty, if we're exiting with an error. std::copy(blks_to_process.begin(), blks_to_process.end(), std::back_inserter(blks_processed)); @@ -873,13 +880,13 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) } // update prefetch score - m_prefetchHitCnt += prefetchHitsRam; + m_prefetch_hit_cnt += prefetchHitsRam; for (IntList_i d = blks_on_disk.begin(); d != blks_on_disk.end(); ++d) { if (m_cfi.TestBitPrefetch(offsetIdx(*d))) - m_prefetchHitCnt++; + m_prefetch_hit_cnt++; } - m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt; + m_prefetch_score = float(m_prefetch_hit_cnt)/m_prefetch_read_cnt; } m_stats.AddReadStats(loc_stats); @@ -893,10 +900,10 @@ void File::WriteBlockToDisk(Block* b) { // write block buffer into disk file long long offset = b->m_offset - m_offset; - long long size = (offset + m_cfi.GetBufferSize()) > m_fileSize ? (m_fileSize - offset) : m_cfi.GetBufferSize(); + long long size = (offset + m_cfi.GetBufferSize()) > m_file_size ? (m_file_size - offset) : m_cfi.GetBufferSize(); const char *buff = &b->m_buff[0]; - ssize_t retval = m_output->Write(buff, offset, size); + ssize_t retval = m_data_file->Write(buff, offset, size); if (retval < size) { @@ -909,7 +916,7 @@ void File::WriteBlockToDisk(Block* b) TRACEF(Error, "File::WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")"); } - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); dec_ref_count(b); @@ -923,7 +930,7 @@ void File::WriteBlockToDisk(Block* b) bool schedule_sync = false; { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); m_cfi.SetBitWritten(blk_idx); @@ -964,14 +971,14 @@ void File::Sync() { TRACEF(Dump, "File::Sync()"); - int ret = m_output->Fsync(); + int ret = m_data_file->Fsync(); bool errorp = false; if (ret == XrdOssOK) { Stats loc_stats = m_stats.Clone(); m_cfi.WriteIOStat(loc_stats); - m_cfi.Write(m_infoFile); - int cret = m_infoFile->Fsync(); + m_cfi.Write(m_info_file); + int cret = m_info_file->Fsync(); if (cret != XrdOssOK) { TRACEF(Error, "File::Sync cinfo file sync error " << cret); @@ -991,7 +998,7 @@ void File::Sync() // Unlink will also call this->initiate_emergency_shutdown() Cache::GetInstance().Unlink(m_filename.c_str()); - XrdSysCondVarHelper _lck(&m_downloadCond); + XrdSysCondVarHelper _lck(&m_state_cond); m_writes_during_sync.clear(); m_in_sync = false; @@ -1001,7 +1008,7 @@ void File::Sync() int written_while_in_sync; { - XrdSysCondVarHelper _lck(&m_downloadCond); + XrdSysCondVarHelper _lck(&m_state_cond); for (std::vector::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i) { m_cfi.SetBitSynced(*i); @@ -1051,13 +1058,13 @@ void File::free_block(Block* b) } else { + cache()->ReleaseRAM(b->m_buff, b->m_size); delete b; - cache()->RAMBlockReleased(); } - if (m_prefetchState == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) + if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) { - m_prefetchState = kOn; + m_prefetch_state = kOn; cache()->RegisterPrefetchFile(this); } } @@ -1101,7 +1108,7 @@ bool File::select_current_io_or_disable_prefetching(bool skip_current) if ( ! io_ok) { m_current_io = m_io_map.end(); - m_prefetchState = kStopped; + m_prefetch_state = kStopped; cache()->DeRegisterPrefetchFile(this); } @@ -1112,7 +1119,7 @@ bool File::select_current_io_or_disable_prefetching(bool skip_current) void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); Block *b = brh->m_block; @@ -1133,7 +1140,7 @@ void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) mi->second.m_allow_prefetching = false; // Check if any IO is still available for prfetching. If not, stop it. - if (m_prefetchState == kOn || m_prefetchState == kHold) + if (m_prefetch_state == kOn || m_prefetch_state == kHold) { if ( ! select_current_io_or_disable_prefetching(false) ) { @@ -1173,7 +1180,7 @@ void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) b->set_error(res); } - m_downloadCond.Broadcast(); + m_state_cond.Broadcast(); } long long File::BufferSize() @@ -1208,9 +1215,9 @@ void File::Prefetch() TRACEF(Dump, "File::Prefetch enter to check download status"); { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); - if (m_prefetchState != kOn) + if (m_prefetch_state != kOn) { return; } @@ -1231,11 +1238,20 @@ void File::Prefetch() BlockMap_i bi = m_block_map.find(f_act); if (bi == m_block_map.end()) { - TRACEF(Dump, "File::Prefetch take block " << f_act); - cache()->RequestRAMBlock(); - blks.push_back( PrepareBlockRequest(f_act, m_current_io->first, true) ); - m_prefetchReadCnt++; - m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt; + Block *b = PrepareBlockRequest(f_act, m_current_io->first, true); + if (b) + { + TRACEF(Dump, "File::Prefetch take block " << f_act); + blks.push_back(b); + // Note: block ref_cnt not increased, it will be when placed into write queue. + m_prefetch_read_cnt++; + m_prefetch_score = float(m_prefetch_hit_cnt)/m_prefetch_read_cnt; + } + else + { + // This shouldn't happen as prefetching stops when RAM is 70% full. + TRACEF(Warning, "File::Prefetch allocation failed for block " << f_act); + } break; } } @@ -1244,7 +1260,7 @@ void File::Prefetch() if (blks.empty()) { TRACEF(Debug, "File::Prefetch file is complete, stopping prefetch."); - m_prefetchState = kComplete; + m_prefetch_state = kComplete; cache()->DeRegisterPrefetchFile(this); } else @@ -1264,7 +1280,7 @@ void File::Prefetch() float File::GetPrefetchScore() const { - return m_prefetchScore; + return m_prefetch_score; } XrdSysError* File::GetLog() diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index 7f60c91a017..44db8b5c952 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -60,32 +60,31 @@ class File; class Block { public: - std::vector m_buff; - long long m_offset; File *m_file; IO *m_io; // IO that handled current request, used for == / != comparisons only + char *m_buff; + long long m_offset; + int m_size; int m_refcnt; int m_errno; // stores negative errno bool m_downloaded; bool m_prefetch; - Block(File *f, IO *io, long long off, int size, bool m_prefetch) : - m_offset(off), m_file(f), m_io(io), m_refcnt(0), - m_errno(0), m_downloaded(false), m_prefetch(m_prefetch) - { - m_buff.resize(size); - } + Block(File *f, IO *io, char *buf, long long off, int size, bool m_prefetch) : + m_file(f), m_io(io), m_buff(buf), m_offset(off), m_size(size), + m_refcnt(0), m_errno(0), m_downloaded(false), m_prefetch(m_prefetch) + {} - char* get_buff(long long pos = 0) { return &m_buff[pos]; } - int get_size() { return (int) m_buff.size(); } - long long get_offset() { return m_offset; } + char* get_buff() { return m_buff; } + int get_size() { return m_size; } + long long get_offset() { return m_offset; } IO* get_io() const { return m_io; } - bool is_finished() { return m_downloaded || m_errno != 0; } - bool is_ok() { return m_downloaded; } - bool is_failed() { return m_errno != 0; } + bool is_finished() { return m_downloaded || m_errno != 0; } + bool is_ok() { return m_downloaded; } + bool is_failed() { return m_errno != 0; } void set_downloaded() { m_downloaded = true; } void set_error(int err) { m_errno = err; } @@ -208,7 +207,7 @@ public: XrdSysError* GetLog(); XrdSysTrace* GetTrace(); - long long GetFileSize() { return m_fileSize; } + long long GetFileSize() { return m_file_size; } void AddIO(IO *io); int GetPrefetchCountOnIO(IO *io); @@ -217,6 +216,12 @@ public: Stats DeltaStatsFromLastCall(); + const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); } + size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); } + int GetBlockSize() const { return m_cfi.GetBufferSize(); } + int GetNBlocks() const { return m_cfi.GetSizeInBits(); } + int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); } + // These three methods are called under Cache's m_active lock int get_ref_cnt() { return m_ref_cnt; } int inc_ref_cnt() { return ++m_ref_cnt; } @@ -233,13 +238,13 @@ private: bool m_is_open; //!< open state (presumably not needed anymore) bool m_in_shutdown; //!< file is in emergency shutdown due to irrecoverable error or unlink request - XrdOssDF *m_output; //!< file handle for data file on disk - XrdOssDF *m_infoFile; //!< file handle for data-info file on disk + XrdOssDF *m_data_file; //!< file handle for data file on disk + XrdOssDF *m_info_file; //!< file handle for data-info file on disk Info m_cfi; //!< download status of file blocks and access statistics std::string m_filename; //!< filename of data file on disk long long m_offset; //!< offset of cached file for block-based / hdfs operation - long long m_fileSize; //!< size of cached disk file for block-based operation + long long m_file_size; //!< size of cached disk file for block-based operation // IO objects attached to this file. @@ -285,18 +290,18 @@ private: BlockMap_t m_block_map; - XrdSysCondVar m_downloadCond; + XrdSysCondVar m_state_cond; Stats m_stats; //!< cache statistics for this instance Stats m_last_stats; //!< copy of cache stats during last purge cycle, used for per directory stat reporting - PrefetchState_e m_prefetchState; + PrefetchState_e m_prefetch_state; - int m_prefetchReadCnt; - int m_prefetchHitCnt; - float m_prefetchScore; // cached + int m_prefetch_read_cnt; + int m_prefetch_hit_cnt; + float m_prefetch_score; // cached - bool m_detachTimeIsLogged; + bool m_detach_time_logged; static const char *m_traceID; diff --git a/src/XrdPfc/XrdPfcIOFileBlock.cc b/src/XrdPfc/XrdPfcIOFileBlock.cc index a7dc0fb9121..c9f3587e13b 100644 --- a/src/XrdPfc/XrdPfcIOFileBlock.cc +++ b/src/XrdPfc/XrdPfcIOFileBlock.cc @@ -37,7 +37,7 @@ using namespace XrdPfc; //______________________________________________________________________________ IOFileBlock::IOFileBlock(XrdOucCacheIO *io, XrdOucCacheStats &statsGlobal, Cache & cache) : - IO(io, statsGlobal, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_infoFile(0) + IO(io, statsGlobal, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_info_file(0) { m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize; GetBlockSizeFromPath(); @@ -108,7 +108,7 @@ void IOFileBlock::CloseInfoFile() { // write access statistics to info file and close it // detach time is needed for file purge - if (m_infoFile) + if (m_info_file) { if (m_info.GetFileSize() > 0) { @@ -116,12 +116,12 @@ void IOFileBlock::CloseInfoFile() Stats as; m_info.WriteIOStatDetach(as); } - m_info.Write(m_infoFile); - m_infoFile->Fsync(); - m_infoFile->Close(); + m_info.Write(m_info_file); + m_info_file->Fsync(); + m_info_file->Close(); - delete m_infoFile; - m_infoFile = 0; + delete m_info_file; + m_info_file = 0; } } @@ -208,10 +208,10 @@ int IOFileBlock::initLocalStat() // try to read from existing file if (m_cache.GetOss()->Stat(path.c_str(), &tmpStat) == XrdOssOK) { - m_infoFile = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str()); - if (m_infoFile->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK) + m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str()); + if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK) { - if (m_info.Read(m_infoFile, path)) + if (m_info.Read(m_info_file, path)) { tmpStat.st_size = m_info.GetFileSize(); TRACEIO(Info, "IOFileBlock::initCachedStat successfuly read size from existing info file = " << tmpStat.st_size); @@ -228,7 +228,7 @@ int IOFileBlock::initLocalStat() // if there is no local info file, try to read from client and then save stat into a new *cinfo file if (res) { - if (m_infoFile) { delete m_infoFile; m_infoFile = 0; } + if (m_info_file) { delete m_info_file; m_info_file = 0; } res = GetInput()->Fstat(tmpStat); TRACEIO(Debug, "IOFileBlock::initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size); @@ -236,8 +236,8 @@ int IOFileBlock::initLocalStat() { if (m_cache.GetOss()->Create(m_cache.RefConfiguration().m_username.c_str(), path.c_str(), 0600, myEnv, XRDOSS_mkpath) == XrdOssOK) { - m_infoFile = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str()); - if (m_infoFile->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK) + m_info_file = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str()); + if (m_info_file->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK) { // This is writing the top-level cinfo // The info file is used to get file size on defer open @@ -245,8 +245,8 @@ int IOFileBlock::initLocalStat() m_info.SetBufferSize(m_cache.RefConfiguration().m_bufferSize); m_info.DisableDownloadStatus(); m_info.SetFileSize(tmpStat.st_size); - m_info.Write(m_infoFile, path); - m_infoFile->Fsync(); + m_info.Write(m_info_file, path); + m_info_file->Fsync(); } else { diff --git a/src/XrdPfc/XrdPfcIOFileBlock.hh b/src/XrdPfc/XrdPfcIOFileBlock.hh index 9c1124e5102..1a4a795f0fa 100644 --- a/src/XrdPfc/XrdPfcIOFileBlock.hh +++ b/src/XrdPfc/XrdPfcIOFileBlock.hh @@ -67,7 +67,7 @@ private: XrdSysMutex m_mutex; //!< map mutex struct stat *m_localStat; Info m_info; - XrdOssDF* m_infoFile; + XrdOssDF* m_info_file; void GetBlockSizeFromPath(); int initLocalStat(); diff --git a/src/XrdPfc/XrdPfcInfo.cc b/src/XrdPfc/XrdPfcInfo.cc index 92d6bb09d2b..3cf3e36d63a 100644 --- a/src/XrdPfc/XrdPfcInfo.cc +++ b/src/XrdPfc/XrdPfcInfo.cc @@ -143,15 +143,15 @@ void Info::SetAllBitsSynced() void Info::SetBufferSize(long long bs) { // Needed only info is created first time in File::Open() - m_store.m_bufferSize = bs; + m_store.m_buffer_size = bs; } //------------------------------------------------------------------------------s void Info::SetFileSize(long long fs) { - m_store.m_fileSize = fs; - ResizeBits((m_store.m_fileSize - 1) / m_store.m_bufferSize + 1); + m_store.m_file_size = fs; + ResizeBits((m_store.m_file_size - 1) / m_store.m_buffer_size + 1); m_store.m_creationTime = time(0); } @@ -213,7 +213,7 @@ bool Info::Read(XrdOssDF* fp, const std::string &fname) } } - if (r.Read(m_store.m_bufferSize)) return false; + if (r.Read(m_store.m_buffer_size)) return false; long long fs; if (r.Read(fs)) return false; @@ -300,8 +300,8 @@ bool Info::Write(XrdOssDF* fp, const std::string &fname) m_store.m_version = s_defaultVersion; if (w.Write(m_store.m_version)) return false; - if (w.Write(m_store.m_bufferSize)) return false; - if (w.Write(m_store.m_fileSize)) return false; + if (w.Write(m_store.m_buffer_size)) return false; + if (w.Write(m_store.m_file_size)) return false; if (w.WriteRaw(m_store.m_buff_synced, GetSizeInBytes())) return false; @@ -456,6 +456,10 @@ bool Info::GetLatestDetachTime(time_t& t) const return t != 0; } +const Info::AStat* Info::GetLastAccessStats() const +{ + return m_store.m_astats.empty() ? 0 : & m_store.m_astats.back(); +} //============================================================================== // Support for reading of previous cinfo versions @@ -478,7 +482,7 @@ bool Info::ReadV2(XrdOssDF* fp, const std::string &fname) FpHelper r(fp, 0, m_trace, m_traceID, trace_pfx + "oss read failed"); if (r.Read(m_store.m_version)) return false; - if (r.Read(m_store.m_bufferSize)) return false; + if (r.Read(m_store.m_buffer_size)) return false; long long fs; if (r.Read(fs)) return false; @@ -553,7 +557,7 @@ bool Info::ReadV1(XrdOssDF* fp, const std::string &fname) FpHelper r(fp, 0, m_trace, m_traceID, trace_pfx + "oss read failed"); if (r.Read(m_store.m_version)) return false; - if (r.Read(m_store.m_bufferSize)) return false; + if (r.Read(m_store.m_buffer_size)) return false; long long fs; if (r.Read(fs)) return false; diff --git a/src/XrdPfc/XrdPfcInfo.hh b/src/XrdPfc/XrdPfcInfo.hh index dafd70f6963..5f2ef28db0f 100644 --- a/src/XrdPfc/XrdPfcInfo.hh +++ b/src/XrdPfc/XrdPfcInfo.hh @@ -48,7 +48,7 @@ class Stats; class Info { public: - // !Access statistics + //! Access statistics struct AStat { time_t AttachTime; //!< open time @@ -71,15 +71,15 @@ public: struct Store { int m_version; //!< info version - long long m_bufferSize; //!< prefetch buffer size - long long m_fileSize; //!< number of file blocks + long long m_buffer_size; //!< prefetch buffer size + long long m_file_size; //!< number of file blocks unsigned char *m_buff_synced; //!< disk written state vector char m_cksum[16]; //!< cksum of downloaded information time_t m_creationTime; //!< time the info file was created size_t m_accessCnt; //!< total access count for the file std::vector m_astats; //!< access records - Store () : m_version(1), m_bufferSize(-1), m_fileSize(0), m_buff_synced(0), m_creationTime(0), m_accessCnt(0) {} + Store () : m_version(1), m_buffer_size(-1), m_file_size(0), m_buff_synced(0), m_creationTime(0), m_accessCnt(0) {} }; @@ -128,7 +128,7 @@ public: void SetFileSize(long long); //--------------------------------------------------------------------- - //! \brief Reserve buffer for fileSize/bufferSize bytes + //! \brief Reserve buffer for file_size / buffer_size bytes //! //! @param n number of file blocks //--------------------------------------------------------------------- @@ -215,6 +215,11 @@ public: //--------------------------------------------------------------------- bool GetLatestDetachTime(time_t& t) const; + //--------------------------------------------------------------------- + //! Get latest access stats + //--------------------------------------------------------------------- + const AStat* GetLastAccessStats() const; + //--------------------------------------------------------------------- //! Get prefetch buffer size //--------------------------------------------------------------------- @@ -253,7 +258,7 @@ public: //--------------------------------------------------------------------- //! Get number of accesses //--------------------------------------------------------------------- - size_t GetAccessCnt() { return m_store.m_accessCnt; } + size_t GetAccessCnt() const { return m_store.m_accessCnt; } //--------------------------------------------------------------------- //! Get version @@ -362,7 +367,7 @@ inline int Info::GetNDownloadedBlocks() const inline long long Info::GetNDownloadedBytes() const { - return m_store.m_bufferSize * GetNDownloadedBlocks(); + return m_store.m_buffer_size * GetNDownloadedBlocks(); } inline int Info::GetLastDownloadedBlock() const @@ -377,9 +382,9 @@ inline long long Info::GetExpectedDataFileSize() const { int last_block = GetLastDownloadedBlock(); if (last_block == m_sizeInBits - 1) - return m_store.m_fileSize; + return m_store.m_file_size; else - return (last_block + 1) * m_store.m_bufferSize; + return (last_block + 1) * m_store.m_buffer_size; } inline int Info::GetSizeInBytes() const @@ -397,7 +402,7 @@ inline int Info::GetSizeInBits() const inline long long Info::GetFileSize() const { - return m_store.m_fileSize; + return m_store.m_file_size; } inline bool Info::IsComplete() const @@ -422,7 +427,7 @@ inline void Info::UpdateDownloadCompleteStatus() inline long long Info::GetBufferSize() const { - return m_store.m_bufferSize; + return m_store.m_buffer_size; } } diff --git a/src/XrdPfc/XrdPfcPurge.cc b/src/XrdPfc/XrdPfcPurge.cc index 1ffc790c16c..a64d0bf844c 100644 --- a/src/XrdPfc/XrdPfcPurge.cc +++ b/src/XrdPfc/XrdPfcPurge.cc @@ -40,7 +40,7 @@ class DirState int m_depth; int m_max_depth; - bool m_stat_report; // storing of stats required + bool m_stat_report; // not used - storing of stats required typedef std::map DsMap_t; typedef DsMap_t::iterator DsMap_i; @@ -222,7 +222,7 @@ class FPurgeState std::string path; long long nBytes; time_t time; - DirState *dirState; + DirState *dirState; // XXXX if this is stored, why is it not used later in purge? FS(const std::string& p, long long n, time_t t, DirState *ds) : path(p), nBytes(n), time(t), dirState(ds) @@ -463,9 +463,40 @@ class FPurgeState }; -} // end XrdPfc namespace + +//============================================================================== +// ResourceMonitor +//============================================================================== + +// Encapsulates local variables used withing the previous mega-function Purge(). +// +// This will be used within the continuously/periodically ran heart-beat / breath +// function ... and then parts of it will be passed to invoked FS scan and purge +// jobs (which will be controlled throught this as well). + +class ResourceMonitor +{ + +}; +//============================================================================== +// +//============================================================================== + +namespace +{ + +class ScanAndPurgeJob : public XrdJob +{ +public: + ScanAndPurgeJob(const char *desc = "") : XrdJob(desc) {} + + void DoIt() {} // { Cache::GetInstance().ScanAndPurge(); } +}; + +} + //============================================================================== // Cache methods //============================================================================== @@ -478,6 +509,7 @@ void Cache::copy_out_active_stats_and_update_data_fs_state() { XrdSysCondVarHelper lock(&m_active_cond); + // Slurp in stats from files closed since last cycle. updates.swap( m_closed_files_stats ); for (ActiveMap_i i = m_active.begin(); i != m_active.end(); ++i) @@ -505,6 +537,98 @@ void Cache::copy_out_active_stats_and_update_data_fs_state() } +//============================================================================== + +void Cache::ResourceMonitorHeartBeat() +{ + // static const char *trc_pfx = "Cache::ResourceMonitorHeartBeat() "; + + // Pause before initial run + sleep(1); + + // XXXX Setup initial / constant stats (total RAM, total disk, ???) + + XrdOucCacheStats &S = Statistics; + XrdOucCacheStats::CacheStats &X = Statistics.X; + + S.Lock(); + + X.DiskSize = m_configuration.m_diskTotalSpace; + + X.MemSize = m_configuration.m_RamAbsAvailable; + + S.UnLock(); + + // XXXX Schedule initial disk scan, time it! + // + // TRACE(Info, trc_pfx << "scheduling intial disk scan."); + // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") ); + // + // bool scan_and_purge_running = true; + + // XXXX Could we really hold last-usage for all files in memory? + + // XXXX Think how to handle disk-full, scan/purge not finishing: + // - start dropping things out of write queue, but only when RAM gets near full; + // - monitoring this then becomes a high-priority job, inner loop with sleep of, + // say, 5 or 10 seconds. + + while (true) + { + time_t heartbeat_start = time(0); + + // TRACE(Info, trc_pfx << "HeartBeat starting ..."); + + // if sumary monitoring configured, pupulate OucCacheStats: + S.Lock(); + + // - available / used disk space (files usage calculated elsewhere (maybe)) + + // - RAM usage + { XrdSysMutexHelper lck(&m_RAM_mutex); + X.MemUsed = m_RAM_used; + X.MemWriteQ = m_RAM_write_queue; + } + // - files opened / closed etc + + // do estimate of available space + S.UnLock(); + + // if needed, schedule purge in a different thread. + // purge is: + // - deep scan + gather FSPurgeState + // - actual purge + // + // this thread can continue running and, if needed, stop writing to disk + // if purge is taking too long. + + // think how data is passed / synchronized between this and purge thread + + // !!!! think how stat collection is done and propgated upwards; + // until now it was done once per purge-interval. + // now stats will be added up more often, but purge will be done + // only occasionally. + // also, do we report cumulative values or deltas? cumulative should + // be easier and consistent with summary data. + // still, some are state - like disk usage, num of files. + + // Do we take care of directories that need to be newly added into DirState hierarchy? + // I.e., when user creates new directories and these are covered by either full + // spec or by root + depth declaration. + + int heartbeat_duration = time(0) - heartbeat_start; + + // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration); + + // int sleep_time = m_configuration.m_purgeInterval - heartbeat_duration; + int sleep_time = 60 - heartbeat_duration; + if (sleep_time > 0) + { + sleep(sleep_time); + } + } +} + //============================================================================== void Cache::Purge() @@ -512,7 +636,6 @@ void Cache::Purge() static const char *trc_pfx = "Cache::Purge() "; XrdOucEnv env; - XrdOss* oss = Cache::GetInstance().GetOss(); long long disk_usage; long long estimated_file_usage = m_configuration.m_diskUsageHWM; @@ -541,11 +664,12 @@ void Cache::Purge() TRACE(Info, trc_pfx << "Started."); + // Bytes to remove based on total disk usage (d) and file usage (f). long long bytesToRemove_d = 0, bytesToRemove_f = 0; // get amount of space to potentially erase based on total disk usage XrdOssVSInfo sP; // Make sure we start when a clean slate in each loop - if (oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0) + if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0) { TRACE(Error, trc_pfx << "can't get StatVS for oss space " << m_configuration.m_data_space); continue; @@ -633,7 +757,7 @@ void Cache::Purge() purgeState.setMinTime(time(0) - m_configuration.m_purgeColdFilesAge); } - XrdOssDF* dh = oss->newDir(m_configuration.m_username.c_str()); + XrdOssDF* dh = m_oss->newDir(m_configuration.m_username.c_str()); if (dh->Opendir("", env) == XrdOssOK) { if (m_fs_state) @@ -726,7 +850,7 @@ void Cache::Purge() } // remove info file - if (oss->Stat(infoPath.c_str(), &fstat) == XrdOssOK) + if (m_oss->Stat(infoPath.c_str(), &fstat) == XrdOssOK) { // cinfo file can be on another oss.space, do not subtract for now. // Could be relevant for very small block sizes. @@ -734,18 +858,18 @@ void Cache::Purge() // estimated_file_usage -= fstat.st_size; // ++deleted_file_count; - oss->Unlink(infoPath.c_str()); + m_oss->Unlink(infoPath.c_str()); TRACE(Dump, trc_pfx << "Removed file: '" << infoPath << "' size: " << fstat.st_size); } // remove data file - if (oss->Stat(dataPath.c_str(), &fstat) == XrdOssOK) + if (m_oss->Stat(dataPath.c_str(), &fstat) == XrdOssOK) { bytesToRemove -= it->second.nBytes; estimated_file_usage -= it->second.nBytes; ++deleted_file_count; - oss->Unlink(dataPath.c_str()); + m_oss->Unlink(dataPath.c_str()); TRACE(Dump, trc_pfx << "Removed file: '" << dataPath << "' size: " << it->second.nBytes << ", time: " << it->first); if (m_fs_state) @@ -822,3 +946,5 @@ void Cache::Purge() 5. ADD disk_usage !!!! DS:disk_usage:GAUGE:... */ + +} // end XrdPfc namespace diff --git a/src/XrdPfc/XrdPfcStats.hh b/src/XrdPfc/XrdPfcStats.hh index 4902269a3cf..af913e90c60 100644 --- a/src/XrdPfc/XrdPfcStats.hh +++ b/src/XrdPfc/XrdPfcStats.hh @@ -25,7 +25,7 @@ namespace XrdPfc { //---------------------------------------------------------------------------- -//! Statistics of disk cache utilisation. +//! Statistics of cache utilisation by a File object. //---------------------------------------------------------------------------- class Stats { diff --git a/src/XrdPfc/XrdPfcVRead.cc b/src/XrdPfc/XrdPfcVRead.cc index 510cdf4ccfe..5c4a9b9b96a 100644 --- a/src/XrdPfc/XrdPfcVRead.cc +++ b/src/XrdPfc/XrdPfcVRead.cc @@ -98,24 +98,24 @@ int File::ReadV(IO *io, const XrdOucIOVec *readV, int n) std::vector chunkVec; DirectResponseHandler *direct_handler = 0; - m_downloadCond.Lock(); + m_state_cond.Lock(); if ( ! m_is_open) { - m_downloadCond.UnLock(); + m_state_cond.UnLock(); TRACEF(Error, "File::ReadV file is not open"); return io->GetInput()->ReadV(readV, n); } if (m_in_shutdown) { - m_downloadCond.UnLock(); + m_state_cond.UnLock(); return -ENOENT; } VReadPreProcess(io, readV, n, blks_to_request, blocks_to_process, blocks_on_disk, chunkVec); - m_downloadCond.UnLock(); + m_state_cond.UnLock(); // ---------------------------------------------------------------- @@ -196,7 +196,7 @@ int File::ReadV(IO *io, const XrdOucIOVec *readV, int n) } { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); // Decrease ref count on the remaining blocks. // This happens when read process aborts due to encountered errors. @@ -227,8 +227,8 @@ bool File::VReadValidate(const XrdOucIOVec *vr, int n) { for (int i = 0; i < n; ++i) { - if (vr[i].offset < 0 || vr[i].offset >= m_fileSize || - vr[i].offset + vr[i].size > m_fileSize) + if (vr[i].offset < 0 || vr[i].offset >= m_file_size || + vr[i].offset + vr[i].size > m_file_size) { return false; } @@ -271,8 +271,9 @@ void File::VReadPreProcess(IO *io, const XrdOucIOVec *readV, int n, } else { - Block *b; - if (Cache::GetInstance().RequestRAMBlock() && (b = PrepareBlockRequest(block_idx, io, false)) != 0) + Block *b = PrepareBlockRequest(block_idx, io, false); + + if (b) { inc_ref_count(b); blocks_to_process.AddEntry(b, iov_idx, true); @@ -316,7 +317,7 @@ int File::VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk& blo overlap(blockIdx, m_cfi.GetBufferSize(), readV[chunkIdx].offset, readV[chunkIdx].size, off, blk_off, size); - int rs = m_output->Read(readV[chunkIdx].data + off, blockIdx*m_cfi.GetBufferSize() + blk_off - m_offset, size); + int rs = m_data_file->Read(readV[chunkIdx].data + off, blockIdx*m_cfi.GetBufferSize() + blk_off - m_offset, size); if (rs < 0) { @@ -353,7 +354,7 @@ int File::VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n, std::vector finished; BlockList_t to_reissue; { - XrdSysCondVarHelper _lck(m_downloadCond); + XrdSysCondVarHelper _lck(m_state_cond); std::vector::iterator bi = blocks_to_process.begin(); while (bi != blocks_to_process.end()) @@ -381,7 +382,7 @@ int File::VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n, if (finished.empty() && to_reissue.empty()) { - m_downloadCond.Wait(); + m_state_cond.Wait(); continue; } } From 454f5884d4be85d74febbc9c1877105443b26d91 Mon Sep 17 00:00:00 2001 From: Matevz Tadel Date: Mon, 1 Jun 2020 15:52:13 -0700 Subject: [PATCH 2/2] Fix Travis error about ignoring retval of posix_memalign. --- src/XrdPfc/XrdPfc.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index cb098cc52c9..1f670edcafe 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -373,8 +373,13 @@ char* Cache::RequestRAM(long long size) else { m_RAM_mutex.UnLock(); - char *buf = 0; - posix_memalign((void**) &buf, s_block_align, (size_t) size); + char *buf; + if (posix_memalign((void**) &buf, s_block_align, (size_t) size)) + { + // Report out of mem? Probably should report it at least the first time, + // then periodically. + return 0; + } return buf; } }