diff --git a/src/XrdFileCache/README b/src/XrdFileCache/README index b0f00ca723c..e4e360a5fb5 100644 --- a/src/XrdFileCache/README +++ b/src/XrdFileCache/README @@ -129,7 +129,7 @@ pfc.osslib [] path to alternative plign for output file system pfc.decisionlib [] path to decision library and plugin parameters -pfc.trace default level is none, xrootd option -d sets debug level +pfc.trace default level is warning, xrootd option -d sets debug level Examples diff --git a/src/XrdFileCache/XrdFileCache.cc b/src/XrdFileCache/XrdFileCache.cc index accaaf8445b..ced8ab84cab 100644 --- a/src/XrdFileCache/XrdFileCache.cc +++ b/src/XrdFileCache/XrdFileCache.cc @@ -138,6 +138,8 @@ Cache::Cache() : XrdOucCache(), m_RAMblocks_used(0) { m_trace = new XrdOucTrace(&m_log); + // default log level is Warning + m_trace->What = 2; } //______________________________________________________________________________ @@ -181,9 +183,12 @@ void Cache::Detach(XrdOucCacheIO* io) while ( it != m_active.end() ) { if (it->io == io) { - it->io->RelinquishFile(it->file); - delete it->file; + if (it->file) { + it->io->RelinquishFile(it->file); + delete it->file; + } m_active.erase(it); + break; } else ++it; @@ -302,9 +307,10 @@ File* Cache::GetFileWithLocalPath(std::string path, IO* iIo) { if (!strcmp(path.c_str(), it->file->lPath())) { - it->io->RelinquishFile(it->file); - it->io = iIo; - return it->file; + File *ff = it->file; + it->io->RelinquishFile(ff); + it->file = 0; + return ff; } } return 0; @@ -371,7 +377,7 @@ Cache::GetNextFileToPrefetch() size_t l = m_prefetchList.size(); int idx = rand() % l; File* f = m_prefetchList[idx]; - f->MarkPrefetch(); + m_prefetch_condVar.UnLock(); return f; } diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index 6edd96789da..9c1adb35a19 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -79,7 +79,6 @@ m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m m_temp_filename(disk_file_path), m_offset(iOffset), m_fileSize(iFileSize), -m_stopping(false), m_stateCond(0), // We will explicitly lock the condition before use. m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")), m_non_flushed_cnt(0), @@ -89,7 +88,6 @@ m_prefetchState(kOn), m_prefetchReadCnt(0), m_prefetchHitCnt(0), m_prefetchScore(1), -m_prefetchCurrentCnt(0), m_traceID("File") { Open(); @@ -117,6 +115,9 @@ File::~File() AppendIOStatToFileInfo(); m_infoFile->Fsync(); + delete m_syncer; + m_syncer = NULL; + if (m_output) { m_output->Close(); @@ -134,63 +135,53 @@ File::~File() TRACEF(Debug, "File::~File() ended, prefetch score = " << m_prefetchScore); } -bool File::InitiateClose() +bool File::ioActive() { // Retruns true if delay is needed TRACEF(Debug, "File::Initiate close start"); m_stateCond.Lock(); - if (!m_stopping) { - m_prefetchState = kCanceled; - cache()->DeRegisterPrefetchFile(this); - m_stopping = true; + if (m_prefetchState != kStopped) { + m_prefetchState = kStopped; + cache()->DeRegisterPrefetchFile(this); } - m_stateCond.UnLock(); - m_stateCond.Lock(); - bool isPrefetching = (m_prefetchCurrentCnt > 0); m_stateCond.UnLock(); - if (isPrefetching == false) - { - m_downloadCond.Lock(); - - /* - // dump print - for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it) { - Block* b = it->second; - TRACEF(Dump, "File::InitiateClose() block idx = " << b->m_offset/m_cfi.GetBufferSize() << " prefetch = " << b->preferch << " refcnt " << b->refcnt); + // remove failed blocks and check if map is empty + m_downloadCond.Lock(); + /* + // high debug print + for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it) { + Block* b = it->second; + TRACEF(Dump, "File::InitiateClose() block idx = " << b->m_offset/m_cfi.GetBufferSize() << " prefetch = " << b->preferch << " refcnt " << b->refcnt); + + } + */ + BlockMap_i itr = m_block_map.begin(); + while (itr != m_block_map.end()) { + if (itr->second->is_failed() && itr->second->m_refcnt == 1) { + BlockMap_i toErase = itr; + ++itr; + TRACEF(Debug, "Remove failed block " << itr->second->m_offset/m_cfi.GetBufferSize()); + free_block(toErase->second); } - */ - - // remove failed blocks - BlockMap_i itr = m_block_map.begin(); - while (itr != m_block_map.end()) { - if (itr->second->is_failed() && itr->second->m_refcnt == 1) { - BlockMap_i toErase = itr; - ++itr; - TRACEF(Debug, "Remove failed block " << itr->second->m_offset/m_cfi.GetBufferSize()); - free_block(toErase->second); - } - else { - ++itr; - } + else { + ++itr; } + } - bool blockMapEmpty = m_block_map.empty(); - m_downloadCond.UnLock(); + bool blockMapEmpty = m_block_map.empty(); + m_downloadCond.UnLock(); - if ( blockMapEmpty) - { - // file is not active when block map is empty and sync is done - XrdSysMutexHelper _lck(&m_syncStatusMutex); - if (m_in_sync == false) { - delete m_syncer; - m_syncer = NULL; - return false; - } + if ( blockMapEmpty) + { + // file is not active when block map is empty and sync is done + XrdSysMutexHelper _lck(&m_syncStatusMutex); + if (m_in_sync == false) { + return false; } } @@ -199,7 +190,13 @@ bool File::InitiateClose() //______________________________________________________________________________ - +void File::WakeUp() +{ + // called if this object is recycled by other IO + m_stateCond.Lock(); + if (m_prefetchState != kComplete) m_prefetchState = kOn; + m_stateCond.UnLock(); +} //============================================================================== @@ -231,10 +228,15 @@ bool File::Open() // Create the info file std::string ifn = m_temp_filename + Info::m_infoExtension; + + struct stat infoStat; + bool fileExisted = (Cache::GetInstance().GetOss()->Stat(ifn.c_str(), &infoStat) == XrdOssOK); + m_output_fs.Create(Cache::GetInstance().RefConfiguration().m_username.c_str(), ifn.c_str(), 0600, myEnv, XRDOSS_mkpath); m_infoFile = m_output_fs.newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); if (m_infoFile) { + if (fileExisted) assert(infoStat.st_size > 0); int res = m_infoFile->Open(ifn.c_str(), O_RDWR, 0600, myEnv); if (res < 0) { @@ -243,28 +245,34 @@ bool File::Open() m_infoFile = 0; return false; } + else { + if (fileExisted) + { + int res = m_cfi.Read(m_infoFile); + TRACEF(Debug, "Reading existing info file bytes = " << res); + m_downloadCond.Lock(); + // this method is called from constructor, no need to lock downloadStaus + bool complete = m_cfi.IsComplete(); + if (complete) m_prefetchState = kComplete; + m_downloadCond.UnLock(); + } + else { + m_fileSize = m_fileSize; + int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1; + TRACEF(Debug, "Creating new file info, data size = " << m_fileSize << " num blocks = " << ss); + m_cfi.SetBufferSize(Cache::GetInstance().RefConfiguration().m_bufferSize); + m_cfi.SetFileSize(m_fileSize); + m_cfi.WriteHeader(m_infoFile); + m_infoFile->Fsync(); + } + } } else { + // this should be a rare case wher FD can't be created return false; } - - if (m_cfi.Read(m_infoFile) <= 0) - { - m_fileSize = m_fileSize; - int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1; - TRACEF(Debug, "Creating new file info, data size = " << m_fileSize << " num blocks = " << ss); - m_cfi.SetBufferSize(Cache::GetInstance().RefConfiguration().m_bufferSize); - m_cfi.SetFileSize(m_fileSize); - m_cfi.WriteHeader(m_infoFile); - m_infoFile->Fsync(); - } - else - { - TRACEF(Debug, "Successfully opened existing info file"); - } - - cache()->RegisterPrefetchFile(this); + if (m_prefetchState != kComplete) cache()->RegisterPrefetchFile(this); return true; } @@ -808,17 +816,9 @@ void File::ProcessBlockResponse(Block* b, int res) if (res >= 0) { b->m_downloaded = true; - TRACEF(Dump, "File::ProcessBlockResponse " << (int)(b->m_offset/BufferSize()) << " finished " << b->is_finished()); - if (!m_stopping) { // AMT theoretically this should be under state lock, but then are double locks - TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize())); - inc_ref_count(b); - cache()->AddWriteTask(b, true); - } - else { - // there is no refcount +/- to remove dropped prefetched blocks on destruction - if (b->m_prefetch && (b->m_refcnt == 0)) - free_block(b); - } + TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize())); + inc_ref_count(b); + cache()->AddWriteTask(b, true); } else { @@ -879,43 +879,42 @@ void File::AppendIOStatToFileInfo() //______________________________________________________________________________ void File::Prefetch() { - if (m_prefetchState == kOn) { - TRACEF(Dump, "File::Prefetch enter to check download status"); + XrdSysCondVarHelper _lck(m_stateCond); + if (m_prefetchState != kOn) + return; + } + + // check index not on disk and not in RAM + TRACEF(Dump, "File::Prefetch enter to check download status"); + bool found = false; + for (int f=0; f < m_cfi.GetSizeInBits(); ++f) + { XrdSysCondVarHelper _lck(m_downloadCond); - // clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status BEGIN %s \n", lPath()); - - // check index not on disk and not in RAM - bool found = false; - for (int f=0; f < m_cfi.GetSizeInBits(); ++f) - { - // clLog()->Dump(XrdCl::AppMsg, "File::Prefetch test bit %d", f); - if (!m_cfi.TestBit(f)) - { - f += m_offset/m_cfi.GetBufferSize(); - BlockMap_i bi = m_block_map.find(f); - if (bi == m_block_map.end()) { - TRACEF(Dump, "File::Prefetch take block " << f); - cache()->RequestRAMBlock(); - RequestBlock(f, true); - m_prefetchReadCnt++; - m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt; - found = true; - break; - } + if (!m_cfi.TestBit(f)) + { + f += m_offset/m_cfi.GetBufferSize(); + BlockMap_i bi = m_block_map.find(f); + if (bi == m_block_map.end()) { + TRACEF(Dump, "File::Prefetch take block " << f); + cache()->RequestRAMBlock(); + RequestBlock(f, true); + m_prefetchReadCnt++; + m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt; + found = true; + break; } } - if (!found) { - TRACEF(Dump, "File::Prefetch no free block found "); - m_cfi.CheckComplete(); - // it is possible all missing blocks are in map but downlaoded status is still not complete - // assert (m_cfi.IsComplete()); - // remove block from map - cache()->DeRegisterPrefetchFile(this); - } } - UnMarkPrefetch(); + + if (!found) { + TRACEF(Dump, "File::Prefetch no free block found "); + m_stateCond.Lock(); + m_prefetchState = kComplete; + m_stateCond.UnLock(); + cache()->DeRegisterPrefetchFile(this); + } } @@ -945,24 +944,6 @@ float File::GetPrefetchScore() const return m_prefetchScore; } -//______________________________________________________________________________ -void File::MarkPrefetch() -{ - m_stateCond.Lock(); - m_prefetchCurrentCnt++; - m_stateCond.UnLock(); - -} - -//______________________________________________________________________________ -void File::UnMarkPrefetch() -{ - m_stateCond.Lock(); - m_prefetchCurrentCnt--; - m_stateCond.UnLock(); -} - - XrdOucTrace* File::GetTrace() { return Cache::GetInstance().GetTrace(); diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index 116cf737c9f..9f7c7d597bd 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -95,7 +95,7 @@ namespace XrdFileCache class File { private: - enum PrefetchState_e { kOn, kHold, kCanceled }; + enum PrefetchState_e { kOn, kHold, kStopped, kComplete }; XrdOucCacheIO2 *m_input; //!< original data source XrdOssDF *m_output; //!< file handle for data file on disk @@ -106,8 +106,6 @@ namespace XrdFileCache long long m_offset; //!< offset of cached file for block-based operation long long m_fileSize; //!< size of cached disk file for block-based operation - bool m_stopping; //!< run thread should be stopped - XrdSysCondVar m_stateCond; //!< state condition variable // fsync @@ -167,7 +165,7 @@ namespace XrdFileCache //! \brief Initiate close. Return true if still IO active. //! Used in XrdPosixXrootd::Close() //---------------------------------------------------------------------- - bool InitiateClose(); + bool ioActive(); //---------------------------------------------------------------------- //! Sync file cache inf o and output data with disk @@ -186,14 +184,16 @@ namespace XrdFileCache float GetPrefetchScore() const; - void MarkPrefetch(); - //! Log path const char* lPath() const; std::string GetLocalPath() { return m_temp_filename; } XrdOucTrace* GetTrace(); + + long long GetFileSize() { return m_fileSize;} + + void WakeUp(); private: bool overlap(int blk, // block to query long long blk_size, // @@ -223,8 +223,6 @@ namespace XrdFileCache void CheckPrefetchStatRAM(Block* b); void CheckPrefetchStatDisk(int idx); - void UnMarkPrefetch(); - void inc_ref_count(Block*); void dec_ref_count(Block*); void free_block(Block*); diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc index 9429b987910..6734b51539e 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc @@ -42,13 +42,23 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache & XrdCl::URL url(m_io->Path()); std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); - if (!(m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this))) + if ((m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this))) { + m_file->WakeUp(); + } + else { + struct stat st; - Fstat(st); + int res = Fstat(st); + + // this should not happen, but make a printout to see it + if (res) + TRACEIO(Error, "IOEntireFile::IOEntireFile, could not get valid stat"); + m_file = new File(io, fname, 0, st.st_size); - Cache::GetInstance().AddActive(this, m_file); } + + Cache::GetInstance().AddActive(this, m_file); } @@ -64,53 +74,76 @@ int IOEntireFile::Fstat(struct stat &sbuff) std::string name = url.GetPath(); name += ".cinfo"; - struct stat* ls = getValidLocalStat(name.c_str()); - if (ls) { - memcpy(&sbuff, ls, sizeof(struct stat)); - return 0; - } - else { - return m_io->Fstat(sbuff); + int res = 0; + if(!m_localStat) { + res = initCachedStat(name.c_str()); + if (res) return res; } + + memcpy(&sbuff, m_localStat, sizeof(struct stat)); + return 0; +} + + +long long IOEntireFile::FSize() +{ + return m_file->GetFileSize(); } void IOEntireFile::RelinquishFile(File* f) { + TRACEIO(Info, "IOEntireFile::RelinquishFile"); assert(m_file == f); m_file = 0; } -struct stat* IOEntireFile::getValidLocalStat(const char* path) +int IOEntireFile::initCachedStat(const char* path) { - if (!m_localStat) { - struct stat tmpStat; - if (m_cache.GetOss()->Stat(path, &tmpStat) == XrdOssOK) { - XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); - XrdOucEnv myEnv; - int res = infoFile->Open(path, O_RDONLY, 0600, myEnv); - if (res >= 0) { - Info info(m_cache.GetTrace()); - if (info.Read(infoFile) > 0) { - tmpStat.st_size = info.GetFileSize(); - m_localStat = new struct stat; - memcpy(m_localStat, &tmpStat, sizeof(struct stat)); - } + // called indirectly from this constructor first time + + int res = -1; + struct stat tmpStat; + + if (m_cache.GetOss()->Stat(path, &tmpStat) == XrdOssOK) { + XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); + XrdOucEnv myEnv; + if (infoFile->Open(path, O_RDONLY, 0600, myEnv) > 0) { + Info info(m_cache.GetTrace()); + printf("reading info file ..\n"); + if (info.Read(infoFile) > 0) { + tmpStat.st_size = info.GetFileSize(); + TRACEIO(Info, "IOEntireFile::initCachedStat successfuly read size from info file = " << tmpStat.st_size); + res = 0; + } + else { + // file exist but can't read it + TRACEIO(Error, "IOEntireFile::initCachedStat failed to read file size from info file"); } - infoFile->Close(); - delete infoFile; } + infoFile->Close(); + delete infoFile; } - return m_localStat; + if (res) { + res = m_io->Fstat(tmpStat); + TRACEIO(Debug, "IOEntireFile::initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size); + } + + if (res == 0) + { + m_localStat = new struct stat; + memcpy(m_localStat, &tmpStat, sizeof(struct stat)); + } + return res; } bool IOEntireFile::ioActive() { - if (!m_file) + if ( ! m_file) return false; else - return m_file->InitiateClose(); + return m_file->ioActive(); } XrdOucCacheIO *IOEntireFile::Detach() @@ -130,11 +163,19 @@ void IOEntireFile::Read (XrdOucCacheIOCB &iocb, char *buff, long long offs, int int IOEntireFile::Read (char *buff, long long off, int size) { TRACEIO(Dump, "IOEntireFile::Read() "<< this << " off: " << off << " size: " << size ); + + // protect from reads over the file size + if (off >= FSize()) + return 0; if (off < 0) { errno = EINVAL; return -1; } + if (off + size > FSize()) + size = FSize() - off; + + ssize_t bytes_read = 0; ssize_t retval = 0; diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh index e3e8fe9f1a8..927be01f281 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh @@ -97,12 +97,15 @@ namespace XrdFileCache virtual int Fstat(struct stat &sbuff); + virtual long long FSize(); + virtual void RelinquishFile(File*); + private: File* m_file; struct stat *m_localStat; - struct stat* getValidLocalStat(const char* path); + int initCachedStat(const char* path); }; } diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc index cd30cbaca62..7fe0d3df8e1 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc @@ -135,7 +135,7 @@ bool IOFileBlock::ioActive() XrdSysMutexHelper lock(&m_mutex); for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) { - if (it->second->InitiateClose()) + if (it->second->ioActive()) return true; } diff --git a/src/XrdFileCache/XrdFileCacheInfo.cc b/src/XrdFileCache/XrdFileCacheInfo.cc index cda22389e63..b9c2951d8ac 100644 --- a/src/XrdFileCache/XrdFileCacheInfo.cc +++ b/src/XrdFileCache/XrdFileCacheInfo.cc @@ -99,9 +99,13 @@ int Info::Read(XrdOssDF* fp) int off = 0; int version; - off += fp->Read(&version, off, sizeof(int)); + off = fp->Read(&version, off, sizeof(int)); + if (off <= 0) { + TRACE(Warning, "Info:::Read() failed"); + return 0; + } if (version != m_version) { - TRACE(Error, "Info:::Read(), incomatible file version"); + TRACE(Error, "Info:::Read(), incompatible file version"); return 0; } diff --git a/src/XrdFileCache/XrdFileCacheInfo.hh b/src/XrdFileCache/XrdFileCacheInfo.hh index 0652e83bdb8..80406c7f03a 100644 --- a/src/XrdFileCache/XrdFileCacheInfo.hh +++ b/src/XrdFileCache/XrdFileCacheInfo.hh @@ -178,7 +178,7 @@ namespace XrdFileCache //--------------------------------------------------------------------- //! Update complete status //--------------------------------------------------------------------- - void CheckComplete(); + void UpdateDownloadCompleteStatus(); //--------------------------------------------------------------------- //! Get number of accesses @@ -272,7 +272,7 @@ namespace XrdFileCache return false; } - inline void Info::CheckComplete() + inline void Info::UpdateDownloadCompleteStatus() { m_complete = !IsAnythingEmptyInRng(0, m_sizeInBits-1); } diff --git a/src/XrdFileCache/XrdFileCacheTrace.hh b/src/XrdFileCache/XrdFileCacheTrace.hh index 77e75353f69..979c87e3275 100644 --- a/src/XrdFileCache/XrdFileCacheTrace.hh +++ b/src/XrdFileCache/XrdFileCacheTrace.hh @@ -10,6 +10,14 @@ #define TRACE_Debug 4 #define TRACE_Dump 5 + +#define TRACE_STR_None "" +#define TRACE_STR_Error "error " +#define TRACE_STR_Warning "warning " +#define TRACE_STR_Info "info " +#define TRACE_STR_Debug "debug " +#define TRACE_STR_Dump "dump " + #ifndef NODEBUG #include "XrdSys/XrdSysHeaders.hh" @@ -21,15 +29,15 @@ #define TRACE(act, x) \ if (XRD_TRACE What >= TRACE_ ## act) \ - {XRD_TRACE Beg(m_traceID); cerr <= TRACE_ ## act) \ - {XRD_TRACE Beg(m_traceID); cerr <Path(); XRD_TRACE End();} + {XRD_TRACE Beg(m_traceID); cerr << TRACE_STR_##act <Path(); XRD_TRACE End();} #define TRACEF(act, x) \ if (XRD_TRACE What >= TRACE_ ## act) \ - {XRD_TRACE Beg(m_traceID); cerr <