diff --git a/src/XrdFileCache/XrdFileCache.cc b/src/XrdFileCache/XrdFileCache.cc index fe50c5c2f0e..25f10e49fb5 100644 --- a/src/XrdFileCache/XrdFileCache.cc +++ b/src/XrdFileCache/XrdFileCache.cc @@ -61,11 +61,10 @@ void *PrefetchThread(void* ptr) return NULL; } -void *DyingFilesNeedSyncThread(void* ptr) + +namespace XrdPosixGlobals { - Cache* cache = static_cast(ptr); - cache->DyingFilesNeedSync(); - return NULL; +extern XrdScheduler *schedP; } extern "C" @@ -94,8 +93,6 @@ XrdOucCache2 *XrdOucGetCache2(XrdSysLogger *logger, pthread_t tid; XrdSysThread::Run(&tid, CacheDirCleanupThread, NULL, 0, "XrdFileCache CacheDirCleanup"); - pthread_t tids; - XrdSysThread::Run(&tids, DyingFilesNeedSyncThread, (void*)(&factory), 0, "XrdFileCache DyingFilesNeedSyncThread"); return &factory; } @@ -108,6 +105,23 @@ Cache &Cache::GetInstance() return *m_factory; } +class DiskSyncer : public XrdJob +{ +private: +File *m_file; +public: +DiskSyncer(File *f, const char *desc = "") : + XrdJob(desc), + m_file(f) +{} +void DoIt() +{ + m_file->Sync(); + Cache::GetInstance().FileSyncDone(m_file); + delete this; +} +}; + //______________________________________________________________________________ bool Cache::Decide(XrdOucCacheIO* io) @@ -137,8 +151,7 @@ Cache::Cache() : XrdOucCache(), m_trace(0), m_traceID("Manager"), m_prefetch_condVar(0), - m_RAMblocks_used(0), - m_sync_condVar(0) + m_RAMblocks_used(0) { m_trace = new XrdOucTrace(&m_log); // default log level is Warning @@ -151,7 +164,7 @@ XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options) { if (Cache::GetInstance().Decide(io)) { - TRACE(Debug, "Cache::Attach() " << io->Path()); + TRACE(Info, "Cache::Attach() " << io->Path()); IO* cio; if (Cache::GetInstance().RefConfiguration().m_hdfsmode) cio = new IOFileBlock(io, m_stats, *this); @@ -178,16 +191,6 @@ int Cache::isAttached() return true; } -void Cache::Detach(File* file) -{ - TRACE(Debug, "Cache::Detach() file = " << file); - - std::map::iterator it = m_active.find(file->GetLocalPath()); - assert (it != m_active.end()); - m_active.erase(it); - delete file; -} - //______________________________________________________________________________ void Cache::AddWriteTask(Block* b, bool fromRead) @@ -268,39 +271,133 @@ Cache::RAMBlockReleased() m_RAMblocks_used--; } -void -Cache::AddActive(File* file) -{ - XrdSysMutexHelper lock(&m_active_mutex); - m_active[file->GetLocalPath()] = file; -} +//______________________________________________________________________________ -File* Cache::GetFileWithLocalPath(std::string path, IO* iIo) +File* Cache::GetFile(const std::string& path, IO* iIO, long long off, long long filesize) { + // Called from virtual IO::Attach + + TRACE(Debug, "Cache::GetFile " << path); + XrdSysMutexHelper lock(&m_active_mutex); - std::map::iterator it = m_active.find(path); + ActiveMap_i it = m_active.find(path); + if (it != m_active.end()) { - it->second->WakeUp(iIo); + IO* prevIO = it->second->SetIO(iIO); + if (prevIO) + { + prevIO->RelinquishFile(it->second); + } + else + { + inc_ref_cnt(it->second, false); + } return it->second; } + else + { + if (filesize == 0) + { + struct stat st; + int res = iIO->Fstat(st); + if (res) { + TRACE(Error, "Cache::GetFile, could not get valid stat"); + return 0; + } + filesize = st.st_size; + } - return 0; + File* file = new File(iIO, path, off, filesize); + inc_ref_cnt(file, false); + m_active[file->GetLocalPath()] = file; + return file; + } } +void Cache::ReleaseFile(File* f) +{ + // Called from virtual IO::Detach + + TRACE(Debug, "Cache::ReleaseFile " << f->GetLocalPath()); + + f->ReleaseIO(); + dec_ref_cnt(f); +} + +//______________________________________________________________________________ + +void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set) +{ + DiskSyncer* ds = new DiskSyncer(f); + if ( ! ref_cnt_already_set) inc_ref_cnt(f, true); + XrdPosixGlobals::schedP->Schedule(ds); +} + +//______________________________________________________________________________ +void Cache::FileSyncDone(File* f) +{ + dec_ref_cnt(f); +} + +//______________________________________________________________________________ +void Cache::inc_ref_cnt(File* f, bool lock) +{ + // called from GetFile() or SheduleFileSync(); + + TRACE(Debug, "Cache::inc_ref_cnt " << f->GetLocalPath()); + if (lock) m_active_mutex.Lock(); + f->inc_ref_cnt(); + if (lock) m_active_mutex.UnLock(); + +} + +//______________________________________________________________________________ +void Cache::dec_ref_cnt(File* f) +{ + // called from ReleaseFile() or DiskSync callback + + m_active_mutex.Lock(); + int cnt = f->get_ref_cnt(); + m_active_mutex.UnLock(); + + TRACE(Debug, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt); + + if (cnt == 1) + { + if (f->FinalizeSyncBeforeExit()) + { + TRACE(Debug, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync"); + schedule_file_sync(f, true); + return; + } + } + + m_active_mutex.Lock(); + cnt = f->dec_ref_cnt(); + TRACE(Debug, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt); + if (cnt == 0) + { + ActiveMap_i it = m_active.find(f->GetLocalPath()); + m_active.erase(it); + delete f; + } + m_active_mutex.UnLock(); +} + +//______________________________________________________________________________ bool Cache::HaveActiveFileWithLocalPath(std::string path) { XrdSysMutexHelper lock(&m_active_mutex); - std::map::iterator it = m_active.find(path); + ActiveMap_i it = m_active.find(path); return (it != m_active.end()); } - //============================================================================== //======================= PREFETCH =================================== //============================================================================== @@ -467,40 +564,4 @@ Cache::Prefetch() } } } - -//______________________________________________________________________________ -void -Cache::RegisterDyingFilesNeedSync(IO* io) -{ - m_sync_condVar.Lock(); - m_syncIOList.push_back(io); - m_sync_condVar.Signal(); - m_sync_condVar.UnLock(); -} - //______________________________________________________________________________ -void -Cache::DyingFilesNeedSync() -{ - while (true) { - m_sync_condVar.Lock(); - while (m_syncIOList.empty()) - { - m_sync_condVar.Wait(); - } - - std::vector::iterator it = m_syncIOList.begin(); - while ( it != m_syncIOList.end()) - { - if ((*it)->FinalizeSyncBeforeExit() == false) - { - IO* bye = *it; - delete bye; - it = m_syncIOList.erase(it); - } - else ++it; - } - m_sync_condVar.UnLock(); - XrdSysTimer::Snooze(1); - } -} diff --git a/src/XrdFileCache/XrdFileCache.hh b/src/XrdFileCache/XrdFileCache.hh index 7b488f7c68b..b68c0fa58a7 100644 --- a/src/XrdFileCache/XrdFileCache.hh +++ b/src/XrdFileCache/XrdFileCache.hh @@ -20,6 +20,7 @@ #include #include +#include "Xrd/XrdScheduler.hh" #include "XrdVersion.hh" #include "XrdSys/XrdSysPthread.hh" #include "XrdOuc/XrdOucCache2.hh" @@ -187,36 +188,36 @@ public: void Prefetch(); - //! Decrease attached count. Called from IO::Detach(). - void Detach(File*); XrdOss* GetOss() const { return m_output_fs; } XrdSysError& GetSysError() { return m_log; } - File* GetFileWithLocalPath(std::string, IO* io); + bool HaveActiveFileWithLocalPath(std::string); + + File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0); - bool HaveActiveFileWithLocalPath(std::string); + void ReleaseFile(File*); - void AddActive(File*); + void ScheduleFileSync(File* f) { schedule_file_sync(f, false); } + void FileSyncDone(File*); + XrdOucTrace* GetTrace() { return m_trace; } - void DyingFilesNeedSync(); - void RegisterDyingFilesNeedSync(IO*); - private: bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc); bool ConfigXeq(char *, XrdOucStream &); bool xdlib(XrdOucStream &); bool xtrace(XrdOucStream &); + static Cache *m_factory; //!< this object - XrdSysError m_log; //!< XrdFileCache namespace logger - XrdOucTrace* m_trace; - const char* m_traceID; + XrdSysError m_log; //!< XrdFileCache namespace logger + XrdOucTrace *m_trace; + const char *m_traceID; - XrdOucCacheStats m_stats; //!< + XrdOucCacheStats m_stats; //!< XrdOss *m_output_fs; //!< disk cache file system std::vector m_decisionpoints; //!< decision plugins @@ -225,37 +226,36 @@ private: Configuration m_configuration; //!< configurable parameters - XrdSysCondVar m_prefetch_condVar; //!< central lock for this class + XrdSysCondVar m_prefetch_condVar; //!< central lock for this class - XrdSysMutex m_RAMblock_mutex; //!< central lock for this class - int m_RAMblocks_used; + XrdSysMutex m_RAMblock_mutex; //!< central lock for this class + int m_RAMblocks_used; struct WriteQ { WriteQ() : condVar(0), size(0) {} - XrdSysCondVar condVar; //!< write list condVar - size_t size; //!< cache size of a container - std::list queue; //!< container + XrdSysCondVar condVar; //!< write list condVar + size_t size; //!< cache size of a container + std::list queue; //!< container }; WriteQ m_writeQ; - struct DiskNetIO - { - DiskNetIO(IO* iIO, File* iFile) : io(iIO), file(iFile){} - IO* io; - File* file; - }; + // active map + typedef std::map ActiveMap_t; + typedef ActiveMap_t::iterator ActiveMap_i; - std::map m_active; - XrdSysMutex m_active_mutex; + ActiveMap_t m_active; + XrdSysMutex m_active_mutex; + + void inc_ref_cnt(File*, bool lock); + void dec_ref_cnt(File*); + + void schedule_file_sync(File*, bool ref_cnt_already_set); // prefetching typedef std::vector PrefetchList; PrefetchList m_prefetchList; - - std::vector m_syncIOList; - XrdSysCondVar m_sync_condVar; }; } diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index e24d9b4cb09..f82c244f465 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -31,17 +31,12 @@ #include "XrdSys/XrdSysTimer.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucEnv.hh" -#include "Xrd/XrdScheduler.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "XrdPosix/XrdPosixFile.hh" #include "XrdPosix/XrdPosix.hh" #include "XrdFileCache.hh" #include "Xrd/XrdScheduler.hh" -namespace XrdPosixGlobals -{ -extern XrdScheduler *schedP; -} using namespace XrdFileCache; @@ -49,20 +44,7 @@ namespace { const int PREFETCH_MAX_ATTEMPTS = 10; -class DiskSyncer : public XrdJob -{ -private: -File *m_file; -public: -DiskSyncer(File *pref, const char *desc = "") : - XrdJob(desc), - m_file(pref) -{} -void DoIt() -{ - m_file->Sync(); -} -}; + Cache* cache() { return &Cache::GetInstance(); } } @@ -71,16 +53,16 @@ const char *File::m_traceID = "File"; //------------------------------------------------------------------------------ -File::File(IO *io, std::string& disk_file_path, long long iOffset, long long iFileSize) : +File::File(IO *io, const std::string& path, long long iOffset, long long iFileSize) : + m_ref_cnt(0), m_is_open(false), m_io(io), m_output(0), m_infoFile(0), m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0), - m_temp_filename(disk_file_path), + m_filename(path), m_offset(iOffset), m_fileSize(iFileSize), - m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")), m_non_flushed_cnt(0), m_in_sync(false), m_downloadCond(0), @@ -111,9 +93,6 @@ File::~File() m_output = NULL; } - delete m_syncer; - m_syncer = NULL; - TRACEF(Debug, "File::~File() ended, prefetch score = " << m_prefetchScore); } @@ -146,7 +125,6 @@ bool File::ioActive() cache()->DeRegisterPrefetchFile(this); } - // High debug print // for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it) // { @@ -172,65 +150,71 @@ bool File::ioActive() } } - blockMapEmpty = m_block_map.empty(); + blockMapEmpty = m_block_map.empty(); } - return !blockMapEmpty; } -bool File::FinalizeSyncBeforeExit() -{ - // returns true if sync is required - // this method is called after corresponding IO is detached from PosixCache - - bool schedule_sync = false; - { - XrdSysCondVarHelper _lck(m_downloadCond); +//------------------------------------------------------------------------------ - if (m_in_sync) return true; +void File::RequestSyncOfDetachStats() +{ + XrdSysCondVarHelper _lck(m_downloadCond); + m_detachTimeIsLogged = false; +} - if (m_writes_during_sync.empty() && m_non_flushed_cnt == 0) - { - if (! m_detachTimeIsLogged) - { - m_cfi.WriteIOStatDetach(m_stats); - m_detachTimeIsLogged = true; - schedule_sync = true; - } - } - else - { - // write leftovers - schedule_sync = true; - } +bool File::FinalizeSyncBeforeExit() +{ + // Returns true if sync is required. + // This method is called after corresponding IO is detached from PosixCache. - if (schedule_sync) - m_in_sync = true; - } + XrdSysCondVarHelper _lck(m_downloadCond); - if (schedule_sync) + if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detachTimeIsLogged) { - XrdPosixGlobals::schedP->Schedule(m_syncer); + m_cfi.WriteIOStatDetach(m_stats); + m_detachTimeIsLogged = true; + TRACEF(Debug, "File::FinalizeSyncBeforeExit scheduling sync to write detach stats"); return true; } - else - { - return false; - } + TRACEF(Debug, "File::FinalizeSyncBeforeExit sync not required"); + return false; } //------------------------------------------------------------------------------ -void File::WakeUp(IO *io) +void File::ReleaseIO() +{ + // called from Cache::ReleaseFile + + m_downloadCond.Lock(); + m_io = 0; + m_downloadCond.UnLock(); + } + +//------------------------------------------------------------------------------ + +IO* File::SetIO(IO *io) { - // called if this object is recycled by other IO + // called if this object is recycled by other IO or detached from cache + + bool cacheActivatePrefetch = false; + + TRACEF(Debug, "File::SetIO() " << (void*)io); + IO* oldIO = m_io; m_downloadCond.Lock(); - m_io->RelinquishFile(this); m_io = io; - if (m_prefetchState != kComplete) m_prefetchState = kOn; + if (io && m_prefetchState != kComplete) + { + cacheActivatePrefetch = true; + m_prefetchState = kOn; + } m_downloadCond.UnLock(); + + if (cacheActivatePrefetch) cache()->RegisterPrefetchFile(this); + return oldIO; } //------------------------------------------------------------------------------ @@ -247,24 +231,24 @@ bool File::Open() char size_str[16]; sprintf(size_str, "%lld", m_fileSize); myEnv.Put("oss.asize", size_str); myEnv.Put("oss.cgroup", Cache::GetInstance().RefConfiguration().m_data_space.c_str()); - if (myOss.Create(myUser, m_temp_filename.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK) + if (myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK) { - TRACEF(Error, "File::Open() Create failed for data file " << m_temp_filename + TRACEF(Error, "File::Open() Create failed for data file " << m_filename << ", err=" << strerror(errno)); return false; } m_output = myOss.newFile(myUser); - if (m_output->Open(m_temp_filename.c_str(), O_RDWR, 0600, myEnv) != XrdOssOK) + if (m_output->Open(m_filename.c_str(), O_RDWR, 0600, myEnv) != XrdOssOK) { - TRACEF(Error, "File::Open() Open failed for data file " << m_temp_filename + TRACEF(Error, "File::Open() Open failed for data file " << m_filename << ", err=" << strerror(errno)); delete m_output; m_output = 0; return false; } // Create the info file - std::string ifn = m_temp_filename + Info::m_infoExtension; + std::string ifn = m_filename + Info::m_infoExtension; struct stat infoStat; bool fileExisted = (myOss.Stat(ifn.c_str(), &infoStat) == XrdOssOK); @@ -282,8 +266,8 @@ bool File::Open() m_infoFile = myOss.newFile(myUser); if (m_infoFile->Open(ifn.c_str(), O_RDWR, 0600, myEnv) != XrdOssOK) { - TRACEF(Error, "File::Open() Open failed for info file " << ifn - << ", err=" << strerror(errno)); + TRACEF(Error, "File::Open() Open failed for info file " << ifn << ", err=" << strerror(errno)); + delete m_infoFile; m_infoFile = 0; delete m_output; m_output = 0; return false; @@ -777,7 +761,7 @@ void File::WriteBlockToDisk(Block* b) if (schedule_sync) { - XrdPosixGlobals::schedP->Schedule(m_syncer); + cache()->ScheduleFileSync(this); } } @@ -889,7 +873,7 @@ long long File::BufferSize() const char* File::lPath() const { - return m_temp_filename.c_str(); + return m_filename.c_str(); } //------------------------------------------------------------------------------ diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index e79ee802755..24389f9a123 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -128,8 +128,7 @@ public: //------------------------------------------------------------------------ //! Constructor. //------------------------------------------------------------------------ - File(IO *io, std::string &path, - long long offset, long long fileSize); + File(IO *io, const std::string &path, long long offset, long long fileSize); //------------------------------------------------------------------------ //! Destructor. @@ -157,11 +156,17 @@ public: bool ioActive(); //---------------------------------------------------------------------- - //! \brief I. Return true if any of blocks need sync. - //! Called from IO::DyingFilesNeedSync() + //! \brief Flags that detach stats should be written out in final sync. + //! Called from CacheIO upon Detach. + //---------------------------------------------------------------------- + void RequestSyncOfDetachStats(); + + //---------------------------------------------------------------------- + //! \brief Returns true if any of blocks need sync. + //! Called from Cache::dec_ref_cnt on zero ref cnt //---------------------------------------------------------------------- bool FinalizeSyncBeforeExit(); - + //---------------------------------------------------------------------- //! Sync file cache inf o and output data with disk //---------------------------------------------------------------------- @@ -182,43 +187,49 @@ public: //! Log path const char* lPath() const; - std::string GetLocalPath() { return m_temp_filename; } + std::string GetLocalPath() { return m_filename; } XrdOucTrace* GetTrace(); long long GetFileSize() { return m_fileSize; } - void WakeUp(IO* io); + IO* SetIO(IO* io); + void ReleaseIO(); + // These three methods are called under Cache's m_active lock + int get_ref_cnt() { return m_ref_cnt; } + int inc_ref_cnt() { return ++m_ref_cnt; } + int dec_ref_cnt() { return --m_ref_cnt; } private: enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete }; - bool m_is_open; //!< open state + int m_ref_cnt; //!< number of references from IO or sync + + bool m_is_open; //!< open state - IO *m_io; //!< original data source - XrdOssDF *m_output; //!< file handle for data file on disk - XrdOssDF *m_infoFile; //!< file handle for data-info file on disk - Info m_cfi; //!< download status of file blocks and access statistics + IO *m_io; //!< original data source + XrdOssDF *m_output; //!< file handle for data file on disk + XrdOssDF *m_infoFile; //!< file handle for data-info file on disk + Info m_cfi; //!< download status of file blocks and access statistics - std::string m_temp_filename; //!< filename of data file on disk + std::string m_filename; //!< filename of data file on disk long long m_offset; //!< offset of cached file for block-based operation long long m_fileSize; //!< size of cached disk file for block-based operation // fsync - XrdJob *m_syncer; std::vector m_writes_during_sync; - int m_non_flushed_cnt; + int m_non_flushed_cnt; bool m_in_sync; - typedef std::list IntList_t; - typedef IntList_t::iterator IntList_i; + typedef std::list IntList_t; + typedef IntList_t::iterator IntList_i; - typedef std::list BlockList_t; + typedef std::list BlockList_t; typedef BlockList_t::iterator BlockList_i; - typedef std::map BlockMap_t; - typedef BlockMap_t::iterator BlockMap_i; + typedef std::map BlockMap_t; + typedef BlockMap_t::iterator BlockMap_i; BlockMap_t m_block_map; @@ -277,8 +288,6 @@ private: int offsetIdx(int idx); }; - - } #endif diff --git a/src/XrdFileCache/XrdFileCacheIO.hh b/src/XrdFileCache/XrdFileCacheIO.hh index 2b9a988405e..44bee416f57 100644 --- a/src/XrdFileCache/XrdFileCacheIO.hh +++ b/src/XrdFileCache/XrdFileCacheIO.hh @@ -35,24 +35,21 @@ public: virtual void RelinquishFile(File*) = 0; - virtual bool FinalizeSyncBeforeExit() = 0; - - XrdOucTrace* GetTrace() {return m_cache.GetTrace(); } + XrdOucTrace* GetTrace() { return m_cache.GetTrace(); } XrdOucCacheIO2* GetInput(); protected: - XrdOucCacheStats &m_statsGlobal; //!< reference to Cache statistics - Cache &m_cache; //!< reference to Cache needed in detach + XrdOucCacheStats &m_statsGlobal; //!< reference to Cache statistics + Cache &m_cache; //!< reference to Cache needed in detach - const char* m_traceID; - std::string m_path; - const char* GetPath() { return m_path.c_str(); } + const char *m_traceID; + std::string m_path; + const char* GetPath() { return m_path.c_str(); } - bool m_syncDiskAfterDetach; private: - XrdOucCacheIO2 *m_io; //!< original data source - XrdSysMutex updMutex; + XrdOucCacheIO2 *m_io; //!< original data source + XrdSysMutex updMutex; void SetInput(XrdOucCacheIO2*); }; } diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc index 5969d09af66..26057690906 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc @@ -32,30 +32,14 @@ using namespace XrdFileCache; //______________________________________________________________________________ - - -IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache & cache) - : IO(io, stats, cache), +IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache & cache) : + IO(io, stats, cache), m_file(0), m_localStat(0) { XrdCl::URL url(GetInput()->Path()); std::string fname = url.GetPath(); - - m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this); - if (! m_file) - { - struct stat st; - int res = Fstat(st); - - // This should not happen, but make a printout to see it. - if (res) - TRACEIO(Error, "IOEntireFile::IOEntireFile, could not get valid stat"); - - m_file = new File(this, fname, 0, st.st_size); - } - - Cache::GetInstance().AddActive(m_file); + m_file = Cache::GetInstance().GetFile(fname, this); } //______________________________________________________________________________ @@ -63,12 +47,8 @@ IOEntireFile::~IOEntireFile() { // called from Detach() if no sync is needed or // from Cache's sync thread - TRACEIO(Debug, "IOEntireFile::~IOEntireFile() "); + TRACEIO(Debug, "IOEntireFile::~IOEntireFile() " << this); - if (m_file) { - m_cache.Detach(m_file); - m_file = 0; - } delete m_localStat; } @@ -97,15 +77,6 @@ long long IOEntireFile::FSize() } //______________________________________________________________________________ -void IOEntireFile::RelinquishFile(File* f) -{ - TRACEIO(Info, "IOEntireFile::RelinquishFile"); - assert(m_file == f); - m_file = 0; -} - -//______________________________________________________________________________ - int IOEntireFile::initCachedStat(const char* path) { // Called indirectly from the constructor. @@ -157,52 +128,45 @@ int IOEntireFile::initCachedStat(const char* path) //______________________________________________________________________________ bool IOEntireFile::ioActive() { - if ( ! m_file) - { - return false; - } - else - { - return m_file->ioActive(); - } + XrdSysMutexHelper lock(&m_mutex); + bool active = m_file && m_file->ioActive(); + return active; } //______________________________________________________________________________ +void IOEntireFile::RelinquishFile(File*) +{ + // Called from Cache::GetFile + + TRACE(Debug, "IOEntireFile::RelinquishFile() " << this); + + XrdSysMutexHelper lock(&m_mutex); + m_file = 0; +} +//______________________________________________________________________________ XrdOucCacheIO *IOEntireFile::Detach() { // Called from XrdPosixFile destructor - - TRACEIO(Debug, "IOEntireFile::Detach() "); - XrdOucCacheIO * io = GetInput(); + TRACE(Info, "IOEntireFile::Detach() " << this); - if ( ! FinalizeSyncBeforeExit() ) { - delete this; - } - else - { - m_cache.RegisterDyingFilesNeedSync(this); + XrdSysMutexHelper lock(&m_mutex); + if (m_file) + { + m_file->RequestSyncOfDetachStats(); + Cache::GetInstance().ReleaseFile(m_file); + m_file = 0; + } } - + XrdOucCacheIO *io = GetInput(); + delete this; return io; } - //______________________________________________________________________________ - -bool IOEntireFile::FinalizeSyncBeforeExit() -{ - if (m_file) - return m_file->FinalizeSyncBeforeExit(); - else - return false; -} - -//______________________________________________________________________________ - -int IOEntireFile::Read (char *buff, long long off, int size) +int IOEntireFile::Read(char *buff, long long off, int size) { TRACEIO(Dump, "IOEntireFile::Read() "<< this << " off: " << off << " size: " << size ); diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh index f4d1029379e..89c656a30ad 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh @@ -82,8 +82,6 @@ public: //! Called to check if destruction needs to be done in a separate task. virtual bool ioActive(); - virtual bool FinalizeSyncBeforeExit(); - virtual int Fstat(struct stat &sbuff); virtual long long FSize(); @@ -91,9 +89,10 @@ public: virtual void RelinquishFile(File*); private: - File* m_file; - struct stat *m_localStat; - int initCachedStat(const char* path); + XrdSysMutex m_mutex; + File *m_file; + struct stat *m_localStat; + int initCachedStat(const char* path); }; } diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc index 0290cfda57f..93d5e45bb67 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc @@ -36,8 +36,8 @@ using namespace XrdFileCache; //______________________________________________________________________________ -IOFileBlock::IOFileBlock(XrdOucCacheIO2 *io, XrdOucCacheStats &statsGlobal, Cache & cache) - : IO(io, statsGlobal, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_infoFile(0) +IOFileBlock::IOFileBlock(XrdOucCacheIO2 *io, XrdOucCacheStats &statsGlobal, Cache & cache) : + IO(io, statsGlobal, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_infoFile(0) { m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize; GetBlockSizeFromPath(); @@ -52,13 +52,6 @@ IOFileBlock::~IOFileBlock() // from Cache's sync thread TRACEIO(Debug, "deleting IOFileBlock"); - - while (! m_blocks.empty()) - { - std::map::iterator it = m_blocks.begin(); - m_cache.Detach(it->second); - m_blocks.erase(it); - } } //______________________________________________________________________________ @@ -66,34 +59,25 @@ XrdOucCacheIO* IOFileBlock::Detach() { // Called from XrdPosixFile destructor - TRACEIO(Debug, "detach IOFileBlock"); - - XrdOucCacheIO * io = GetInput(); + TRACEIO(Info, "Detach IOFileBlock"); - // call need sync on all - if ( ! FinalizeSyncBeforeExit() ) - { - delete this; - } - else + CloseInfoFile(); { - m_cache.RegisterDyingFilesNeedSync(this); + XrdSysMutexHelper lock(&m_mutex); + for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) + { + if (it->second) + { + it->second->RequestSyncOfDetachStats(); + m_cache.ReleaseFile(it->second); + } + } } - + XrdOucCacheIO *io = GetInput(); + delete this; return io; } -//______________________________________________________________________________ -bool IOFileBlock::FinalizeSyncBeforeExit() -{ - bool syncDone = false; - for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) - { - bool s = it->second->FinalizeSyncBeforeExit(); - syncDone = syncDone | s; - } - return syncDone; -} //______________________________________________________________________________ void IOFileBlock::CloseInfoFile() @@ -157,13 +141,7 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize) TRACEIO(Debug, "FileBlock::FileBlock(), create XrdFileCacheFile "); - File* file = Cache::GetInstance().GetFileWithLocalPath(fname, this); - if (! file) - { - file = new File(this, fname, off, blocksize); - Cache::GetInstance().AddActive(file); - } - + File* file = Cache::GetInstance().GetFile(fname, this, off, blocksize); return file; } @@ -265,30 +243,25 @@ int IOFileBlock::initLocalStat() //______________________________________________________________________________ void IOFileBlock::RelinquishFile(File* f) { - // called from Cache::Detach() or Cache::GetFileWithLocalPath() - // the object is in process of dying - + // called from cache::GetFile if File f changes ownership + XrdSysMutexHelper lock(&m_mutex); for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) { if (it->second == f) - { - m_blocks.erase(it); - break; - } + it->second = 0; } } - //______________________________________________________________________________ bool IOFileBlock::ioActive() { - CloseInfoFile(); - XrdSysMutexHelper lock(&m_mutex); bool active = false; for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) { - if (it->second->ioActive()) { + // need to initiate stop on all File / block objects + if (it->second && it->second->ioActive()) + { active = true; } } diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.hh b/src/XrdFileCache/XrdFileCacheIOFileBlock.hh index c6457f4cde5..b8360957201 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.hh +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.hh @@ -63,8 +63,6 @@ public: //! \brief Virtual method of XrdOucCacheIO. //! Called to check if destruction needs to be done in a separate task. virtual bool ioActive(); - - virtual bool FinalizeSyncBeforeExit(); virtual int Fstat(struct stat &sbuff); @@ -80,8 +78,8 @@ private: Info m_info; XrdOssDF* m_infoFile; - void GetBlockSizeFromPath(); - int initLocalStat(); + void GetBlockSizeFromPath(); + int initLocalStat(); File* newBlockFile(long long off, int blocksize); void CloseInfoFile(); }; diff --git a/src/XrdFileCache/XrdFileCacheInfo.cc b/src/XrdFileCache/XrdFileCacheInfo.cc index c3edd13db25..f47e2d44b7b 100644 --- a/src/XrdFileCache/XrdFileCacheInfo.cc +++ b/src/XrdFileCache/XrdFileCacheInfo.cc @@ -306,7 +306,7 @@ void Info::GetCksum( unsigned char* buff, char* digest) //------------------------------------------------------------------------------ void Info::DisableDownloadStatus() { - // use version sign to skip downlaod status + // use version sign to skip download status m_store.m_version = -m_store.m_version; } //------------------------------------------------------------------------------ @@ -364,7 +364,7 @@ void Info::WriteIOStatDetach(Stats& s) void Info::WriteIOStatAttach() { m_store.m_accessCnt++; - if ( m_store.m_astats.size() >= m_maxNumAccess) + if (m_store.m_astats.size() >= m_maxNumAccess) m_store.m_astats.erase( m_store.m_astats.begin()); AStat as;