Skip to content

Commit

Permalink
Optimize detection of file available for prefetching.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and osschar committed Mar 9, 2016
1 parent 13c6e23 commit 4af1628
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 49 deletions.
3 changes: 0 additions & 3 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,6 @@ Cache::GetNextFileToPrefetch()
size_t l = m_files.size();
int idx = rand() % l;
File* f = m_files[idx];

std::random_shuffle(m_files.begin(), m_files.end());
File* f = m_files.back();
f->MarkPrefetch();
m_prefetch_condVar.UnLock();
return f;
Expand Down
87 changes: 41 additions & 46 deletions src/XrdFileCache/XrdFileCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace XrdPosixGlobals
namespace
{
const int PREFETCH_MAX_ATTEMPTS = 10;
const size_t PREFETCH_MAX_BLOCKS=5;

class DiskSyncer : public XrdJob
{
private:
Expand Down Expand Up @@ -85,6 +87,7 @@ m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")),
m_non_flushed_cnt(0),
m_in_sync(false),
m_downloadCond(0),
m_prefetchState(kOn),
m_prefetchReadCnt(0),
m_prefetchHitCnt(0),
m_prefetchScore(1),
Expand Down Expand Up @@ -213,7 +216,8 @@ bool File::InitiateClose()

m_stateCond.Lock();
m_stopping = true;
m_stateCond.UnLock();
m_stateCond.UnLock();
m_prefetchState = kCanceled;
if (m_cfi.IsComplete()) return false; // AMT maybe map size is here more meaningfull, but might hold block state lock
return true;
}
Expand Down Expand Up @@ -354,6 +358,10 @@ Block* File::RequestBlock(int i, bool prefetch)
if (status.IsOK()) {
clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() this = %p, b=%p, this idx=%d pOn=(%d) %s", (void*)this, (void*)b, i, prefetch, lPath());
m_block_map[i] = b;

if (m_prefetchState == kOn && m_block_map.size() > PREFETCH_MAX_BLOCKS) {
m_prefetchState = kHold;
}
return b;
}
else {
Expand Down Expand Up @@ -807,7 +815,6 @@ void File::inc_ref_count(Block* b)

//______________________________________________________________________________


void File::dec_ref_count(Block* b)
{
// Method always called under lock
Expand All @@ -832,6 +839,9 @@ void File::free_block(Block* b)
else {
cache()->RAMBlockReleased();
}

if (m_prefetchState == kHold && m_block_map.size() < PREFETCH_MAX_BLOCKS)
m_prefetchState = kOn;
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -919,57 +929,42 @@ void File::AppendIOStatToFileInfo()
//______________________________________________________________________________
void File::Prefetch()
{
bool stopping = false;
m_stateCond.Lock();
stopping = m_stopping;
m_stateCond.UnLock();


if (!stopping) {
if (m_prefetchState == kOn) {

// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status \n");
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status \n");
XrdSysCondVarHelper _lck(m_downloadCond);
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status BEGIN %s \n", lPath());
if (m_cfi.IsComplete() == false && m_block_map.size() < 3)
{

// check index not on disk and not in RAM
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)
{
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch test bit %d", f);
if (!m_cfi.TestBit(f))
{
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch take block %d %s", f, lPath());
cache()->RequestRAMBlock();
RequestBlock(f, true);
m_prefetchReadCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
found = true;
break;
}
// check index not on disk and not in RAM
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)
{
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch test bit %d", f);
if (!m_cfi.TestBit(f))
{
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch take block %d %s", f, lPath());
cache()->RequestRAMBlock();
RequestBlock(f, true);
m_prefetchReadCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
found = true;
break;
}
}
if (!found) {
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch no free blcok found ");
m_cfi.CheckComplete();
// assert (m_cfi.IsComplete());
// it is possible all missing blocks are in map but downlaoded status is still not complete
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch -- unlikely to happen ... file seem to be complete %s", lPath());
// remove block from map
cache()->DeRegisterPrefetchFile(this);
}
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch end");
} /*
else if (m_block_map.size() >= 3) {
clLog()->Dump(XrdCl::AppMsg,"skip prefetch %s ", lPath());
for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it )
{
clLog()->Dump(XrdCl::AppMsg, "block idx = %d, ref_count = %d, prefetch=%d [%s]", it->first, it->second->m_refcnt, it->second->m_prefetch, lPath());
}
}*/
}
if (!found) {
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch no free blcok found ");
m_cfi.CheckComplete();
// assert (m_cfi.IsComplete());
// it is possible all missing blocks are in map but downlaoded status is still not complete
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch -- unlikely to happen ... file seem to be complete %s", lPath());
// remove block from map
cache()->DeRegisterPrefetchFile(this);
}
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch end");
}

UnMarkPrefetch();
Expand Down
4 changes: 4 additions & 0 deletions src/XrdFileCache/XrdFileCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ namespace XrdFileCache
class File
{
private:
enum PrefetchState_e { kOn, kHold, kCanceled };

XrdOucCacheIO &m_input; //!< original data source
XrdOssDF *m_output; //!< file handle for data file on disk
XrdOssDF *m_infoFile; //!< file handle for data-info file on disk
Expand Down Expand Up @@ -123,6 +125,8 @@ namespace XrdFileCache

Stats m_stats; //!< cache statistics, used in IO detach

PrefetchState_e m_prefetchState;

int m_prefetchReadCnt;
int m_prefetchHitCnt;
float m_prefetchScore; //cached
Expand Down

0 comments on commit 4af1628

Please sign in to comment.