From 3ffa8209e1d9a34b6fa81324386c27ac02a9656a Mon Sep 17 00:00:00 2001 From: Alja Mrak-Tadel Date: Thu, 9 Jun 2016 14:21:46 -0700 Subject: [PATCH] Remove m_stopped varaible. Add stopped prefetch state. --- src/XrdFileCache/XrdFileCacheFile.cc | 94 ++++++++++++++-------------- src/XrdFileCache/XrdFileCacheFile.hh | 4 +- 2 files changed, 48 insertions(+), 50 deletions(-) diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index 5159dc398de..f2537de9377 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -79,7 +79,6 @@ m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m m_temp_filename(disk_file_path), m_offset(iOffset), m_fileSize(iFileSize), -m_stopping(false), m_stateCond(0), // We will explicitly lock the condition before use. m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")), m_non_flushed_cnt(0), @@ -141,10 +140,9 @@ bool File::ioActive() TRACEF(Debug, "File::Initiate close start"); m_stateCond.Lock(); - if (!m_stopping) { - m_prefetchState = kCanceled; - cache()->DeRegisterPrefetchFile(this); - m_stopping = true; + if (m_prefetchState != kStopped) { + m_prefetchState = kStopped; + cache()->DeRegisterPrefetchFile(this); } m_stateCond.UnLock(); @@ -253,6 +251,11 @@ bool File::Open() { int res = m_cfi.Read(m_infoFile); TRACEF(Debug, "Reading existing info file bytes = " << res); + m_downloadCond.Lock(); + // this method is called from constructor, no need to lock downloadStaus + bool complete = m_cfi.IsComplete(); + if (complete) m_prefetchState = kComplete; + } else { m_fileSize = m_fileSize; @@ -270,7 +273,7 @@ bool File::Open() // this should be a rare case wher FD can't be created return false; } - cache()->RegisterPrefetchFile(this); + if (m_prefetchState != kComplete) cache()->RegisterPrefetchFile(this); return true; } @@ -814,17 +817,9 @@ void File::ProcessBlockResponse(Block* b, int res) if (res >= 0) { b->m_downloaded = true; - TRACEF(Dump, "File::ProcessBlockResponse " << (int)(b->m_offset/BufferSize()) << " finished " << b->is_finished()); - if (!m_stopping) { // AMT theoretically this should be under state lock, but then are double locks - TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize())); - inc_ref_count(b); - cache()->AddWriteTask(b, true); - } - else { - // there is no refcount +/- to remove dropped prefetched blocks on destruction - if (b->m_prefetch && (b->m_refcnt == 0)) - free_block(b); - } + TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize())); + inc_ref_count(b); + cache()->AddWriteTask(b, true); } else { @@ -885,42 +880,47 @@ void File::AppendIOStatToFileInfo() //______________________________________________________________________________ void File::Prefetch() { - if (m_prefetchState == kOn) { - TRACEF(Dump, "File::Prefetch enter to check download status"); + XrdSysCondVarHelper _lck(m_stateCond); + if (m_prefetchState == kComplete ) { + cache()->DeRegisterPrefetchFile(this); + return; + } + else if (m_prefetchState == kHold || m_prefetchState == kStopped) + return; + } + + // check index not on disk and not in RAM + TRACEF(Dump, "File::Prefetch enter to check download status"); + bool found = false; + for (int f=0; f < m_cfi.GetSizeInBits(); ++f) + { XrdSysCondVarHelper _lck(m_downloadCond); - // clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status BEGIN %s \n", lPath()); - - // check index not on disk and not in RAM - bool found = false; - for (int f=0; f < m_cfi.GetSizeInBits(); ++f) - { - // clLog()->Dump(XrdCl::AppMsg, "File::Prefetch test bit %d", f); - if (!m_cfi.TestBit(f)) - { - f += m_offset/m_cfi.GetBufferSize(); - BlockMap_i bi = m_block_map.find(f); - if (bi == m_block_map.end()) { - TRACEF(Dump, "File::Prefetch take block " << f); - cache()->RequestRAMBlock(); - RequestBlock(f, true); - m_prefetchReadCnt++; - m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt; - found = true; - break; - } + if (!m_cfi.TestBit(f)) + { + f += m_offset/m_cfi.GetBufferSize(); + BlockMap_i bi = m_block_map.find(f); + if (bi == m_block_map.end()) { + TRACEF(Dump, "File::Prefetch take block " << f); + cache()->RequestRAMBlock(); + RequestBlock(f, true); + m_prefetchReadCnt++; + m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt; + found = true; + break; } } - if (!found) { - TRACEF(Dump, "File::Prefetch no free block found "); - m_cfi.UpdateDownloadCompleteStatus(); - // it is possible all missing blocks are in map but downlaoded status is still not complete - // assert (m_cfi.IsComplete()); - // remove block from map - cache()->DeRegisterPrefetchFile(this); - } } + + if (!found) { + TRACEF(Dump, "File::Prefetch no free block found "); + m_stateCond.Lock(); + m_prefetchState = kComplete; + m_stateCond.UnLock(); + cache()->DeRegisterPrefetchFile(this); + } + UnMarkPrefetch(); } diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index d838fbafc88..413b82cb415 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -95,7 +95,7 @@ namespace XrdFileCache class File { private: - enum PrefetchState_e { kOn, kHold, kCanceled }; + enum PrefetchState_e { kOn, kHold, kStopped, kComplete }; XrdOucCacheIO2 *m_input; //!< original data source XrdOssDF *m_output; //!< file handle for data file on disk @@ -106,8 +106,6 @@ namespace XrdFileCache long long m_offset; //!< offset of cached file for block-based operation long long m_fileSize; //!< size of cached disk file for block-based operation - bool m_stopping; //!< run thread should be stopped - XrdSysCondVar m_stateCond; //!< state condition variable // fsync