Skip to content

Commit

Permalink
Unregister file from cache prefetch when it is complete.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent 71ef3c0 commit c2628fe
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -398,8 +398,6 @@ int File::ReadBlocksFromDisk(std::list<int>& blocks,

int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() begin ");

const long long BS = m_cfi.GetBufferSize();

// lock
Expand Down Expand Up @@ -432,6 +430,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// XXXX Or just push it and handle errors in one place later?

inc_ref_count(bi->second);
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count for existing %d %s", block_idx, lPath());
blks_to_process.push_front(bi->second);
m_stats.m_BytesRam++;
}
Expand All @@ -448,6 +447,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
if ( cache()->RequestRAMBlock())
{
clLog()->Debug(XrdCl::AppMsg, "File::Read() request block to fetch %d", block_idx);
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count new %d %s", block_idx, lPath());
Block *b = RequestBlock(block_idx, false);
inc_ref_count(b);
blks_to_process.push_back(b);
Expand Down Expand Up @@ -598,6 +598,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

for (BlockList_i bi = blks_processed.begin(); bi != blks_processed.end(); ++bi)
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() dec_ref_count %d %s", ((*bi)->m_offset/BufferSize()), lPath());
dec_ref_count(*bi);
// XXXX stamp block
}
Expand Down Expand Up @@ -649,6 +650,7 @@ void File::WriteBlockToDisk(Block* b)

{
XrdSysCondVarHelper _lck(m_downloadCond);
clLog()->Dump(XrdCl::AppMsg, "File::WriteToDisk() dec_ref_count %d %s", pfIdx, lPath());
dec_ref_count(b);
}

Expand Down Expand Up @@ -718,7 +720,7 @@ void File::inc_ref_count(Block* b)
void File::dec_ref_count(Block* b)
{
// Method always called under lock
clLog()->Error(XrdCl::AppMsg, "File::dec_ref_count %d %s ",b->m_refcnt, lPath());
// clLog()->Error(XrdCl::AppMsg, "File::dec_ref_count %d %s ",b->m_refcnt, lPath());
b-> m_refcnt--;
assert(b->m_refcnt >= 0);

Expand All @@ -739,7 +741,7 @@ void File::dec_ref_count(Block* b)

void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
{
clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d %s",(int)(b->m_offset/BufferSize()), lPath());
// clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d %s",(int)(b->m_offset/BufferSize()), lPath());

m_downloadCond.Lock();

Expand All @@ -748,6 +750,7 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
b->m_downloaded = true;
clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d finished %d %s",(int)(b->m_offset/BufferSize()), b->is_finished(), lPath());
if (!m_stopping) { // AMT theoretically this should be under state lock, but then are double locks
clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse inc_ref_count %d %s\n", (int)(b->m_offset/BufferSize()), lPath());
inc_ref_count(b);
cache()->AddWriteTask(b, true);
}
Expand Down Expand Up @@ -835,7 +838,7 @@ void File::Prefetch()

// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status \n");
XrdSysCondVarHelper _lck(m_downloadCond);
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status BEGIN \n");
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status BEGIN %s \n", lPath());
if (m_cfi.IsComplete() == false && m_block_map.size() < 3)
{

Expand All @@ -851,7 +854,6 @@ void File::Prefetch()
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch take block %d", f);
cache()->RequestRAMBlock();
RequestBlock(f, true);
/// inc_ref_count(b); AMT don't increase it, there is no-one to annulate it 0
m_prefetchReadCnt++;
m_prefetchScore = m_prefetchHitCnt/m_prefetchReadCnt;
found = true;
Expand All @@ -862,11 +864,21 @@ void File::Prefetch()
if (!found) {
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch no free blcok found ");
m_cfi.CheckComplete();
if (m_cfi.IsComplete() == false)
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch This shoulf not happedn !!!");
// assert (m_cfi.IsComplete());
// it is possible all missing blocks are in map but downlaoded status is still not complete
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch -- unlikely to happen ... file seem to be complete %s", lPath());
// remove block from map
cache()->DeRegisterPrefetchFile(this);
}
clLog()->Debug(XrdCl::AppMsg, "File::Prefetch end");
}
else if (m_block_map.size() >= 3) {
clLog()->Dump(XrdCl::AppMsg,"skip prefetch %s ", lPath());
for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it )
{
clLog()->Dump(XrdCl::AppMsg, "block idx = %d, ref_count = %d, prefetch=%d [%s]", it->first, it->second->m_refcnt, it->second->m_prefetch, lPath());
}
}
}

UnMarkPrefetch();
Expand Down Expand Up @@ -924,7 +936,7 @@ void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::HandleResponse()");
m_block->m_file->ProcessBlockResponse(m_block, status);
m_block->m_file->ProcessBlockResponse(m_block, status);

delete status;
delete response;
Expand Down

0 comments on commit c2628fe

Please sign in to comment.