Skip to content

Commit

Permalink
Remove m_stopped varaible. Add stopped prefetch state.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent 4a61855 commit 3ffa820
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 50 deletions.
94 changes: 47 additions & 47 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -79,7 +79,6 @@ m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m
m_temp_filename(disk_file_path),
m_offset(iOffset),
m_fileSize(iFileSize),
m_stopping(false),
m_stateCond(0), // We will explicitly lock the condition before use.
m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")),
m_non_flushed_cnt(0),
Expand Down Expand Up @@ -141,10 +140,9 @@ bool File::ioActive()
TRACEF(Debug, "File::Initiate close start");

m_stateCond.Lock();
if (!m_stopping) {
m_prefetchState = kCanceled;
cache()->DeRegisterPrefetchFile(this);
m_stopping = true;
if (m_prefetchState != kStopped) {
m_prefetchState = kStopped;
cache()->DeRegisterPrefetchFile(this);
}
m_stateCond.UnLock();

Expand Down Expand Up @@ -253,6 +251,11 @@ bool File::Open()
{
int res = m_cfi.Read(m_infoFile);
TRACEF(Debug, "Reading existing info file bytes = " << res);
m_downloadCond.Lock();
// this method is called from constructor, no need to lock downloadStaus
bool complete = m_cfi.IsComplete();
if (complete) m_prefetchState = kComplete;

}
else {
m_fileSize = m_fileSize;
Expand All @@ -270,7 +273,7 @@ bool File::Open()
// this should be a rare case wher FD can't be created
return false;
}
cache()->RegisterPrefetchFile(this);
if (m_prefetchState != kComplete) cache()->RegisterPrefetchFile(this);
return true;
}

Expand Down Expand Up @@ -814,17 +817,9 @@ void File::ProcessBlockResponse(Block* b, int res)
if (res >= 0)
{
b->m_downloaded = true;
TRACEF(Dump, "File::ProcessBlockResponse " << (int)(b->m_offset/BufferSize()) << " finished " << b->is_finished());
if (!m_stopping) { // AMT theoretically this should be under state lock, but then are double locks
TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize()));
inc_ref_count(b);
cache()->AddWriteTask(b, true);
}
else {
// there is no refcount +/- to remove dropped prefetched blocks on destruction
if (b->m_prefetch && (b->m_refcnt == 0))
free_block(b);
}
TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize()));
inc_ref_count(b);
cache()->AddWriteTask(b, true);
}
else
{
Expand Down Expand Up @@ -885,42 +880,47 @@ void File::AppendIOStatToFileInfo()
//______________________________________________________________________________
void File::Prefetch()
{
if (m_prefetchState == kOn)
{
TRACEF(Dump, "File::Prefetch enter to check download status");
XrdSysCondVarHelper _lck(m_stateCond);
if (m_prefetchState == kComplete ) {
cache()->DeRegisterPrefetchFile(this);
return;
}
else if (m_prefetchState == kHold || m_prefetchState == kStopped)
return;
}

// check index not on disk and not in RAM
TRACEF(Dump, "File::Prefetch enter to check download status");
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)
{
XrdSysCondVarHelper _lck(m_downloadCond);
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status BEGIN %s \n", lPath());

// check index not on disk and not in RAM
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)
{
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch test bit %d", f);
if (!m_cfi.TestBit(f))
{
f += m_offset/m_cfi.GetBufferSize();
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
TRACEF(Dump, "File::Prefetch take block " << f);
cache()->RequestRAMBlock();
RequestBlock(f, true);
m_prefetchReadCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
found = true;
break;
}
if (!m_cfi.TestBit(f))
{
f += m_offset/m_cfi.GetBufferSize();
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
TRACEF(Dump, "File::Prefetch take block " << f);
cache()->RequestRAMBlock();
RequestBlock(f, true);
m_prefetchReadCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
found = true;
break;
}
}
if (!found) {
TRACEF(Dump, "File::Prefetch no free block found ");
m_cfi.UpdateDownloadCompleteStatus();
// it is possible all missing blocks are in map but downlaoded status is still not complete
// assert (m_cfi.IsComplete());
// remove block from map
cache()->DeRegisterPrefetchFile(this);
}
}


if (!found) {
TRACEF(Dump, "File::Prefetch no free block found ");
m_stateCond.Lock();
m_prefetchState = kComplete;
m_stateCond.UnLock();
cache()->DeRegisterPrefetchFile(this);
}

UnMarkPrefetch();
}

Expand Down
4 changes: 1 addition & 3 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -95,7 +95,7 @@ namespace XrdFileCache
class File
{
private:
enum PrefetchState_e { kOn, kHold, kCanceled };
enum PrefetchState_e { kOn, kHold, kStopped, kComplete };

XrdOucCacheIO2 *m_input; //!< original data source
XrdOssDF *m_output; //!< file handle for data file on disk
Expand All @@ -106,8 +106,6 @@ namespace XrdFileCache
long long m_offset; //!< offset of cached file for block-based operation
long long m_fileSize; //!< size of cached disk file for block-based operation

bool m_stopping; //!< run thread should be stopped

XrdSysCondVar m_stateCond; //!< state condition variable

// fsync
Expand Down

0 comments on commit 3ffa820

Please sign in to comment.