Skip to content

Commit

Permalink
Handle case where File::Read() does not wait blocks to be processed.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent 155827c commit 6591d0c
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -130,7 +130,7 @@ File::~File()
BlockMap_i toErase = itr;
++itr;
m_block_map.erase(toErase);
// Relase RAM AMT !!!
cache()->RAMBlockReleased();
}
else {
++itr;
Expand Down Expand Up @@ -340,7 +340,7 @@ Block* File::RequestBlock(int i, bool prefetch)

Block *b = new Block(this, off, this_bs, prefetch); // should block be reused to avoid recreation
m_block_map[i] = b;
clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() %p idx=%d pOn=(%d) %s", (void*)b, i, prefetch, lPath());
clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() this = %p, b=%p, this idx=%d pOn=(%d) %s", (void*)this, (void*)b, i, prefetch, lPath());

client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b));

Expand Down Expand Up @@ -450,9 +450,9 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// XXXX Or just push it and handle errors in one place later?

inc_ref_count(bi->second);
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count for existing %d %s", block_idx, lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count for existing b=%p %d %s", (void*)bi->second, block_idx, lPath());
blks_to_process.push_front(bi->second);
m_stats.m_BytesRam++;
m_stats.m_BytesRam++; // AMT what if block fails
}
// On disk?
else if (m_cfi.TestBit(block_idx))
Expand Down Expand Up @@ -508,13 +508,14 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
}
else
{
bytes_read = rc; // AMT ?? should there be an immediate return
bytes_read = rc;
clLog()->Error(XrdCl::AppMsg, "File::Read() failed to read from disk.");
return rc;
// AMT commented line below should not be an immediate return, can have block refcount increased and map increased
// return rc;
}
}
// Third, loop over blocks that are available or incoming
while ( ! blks_to_process.empty() && bytes_read >= 0) // AMT : do I need this loop ?
while ( (! blks_to_process.empty()) && (bytes_read >= 0))
{
BlockList_t finished;

Expand All @@ -524,10 +525,10 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
BlockList_i bi = blks_to_process.begin();
while (bi != blks_to_process.end())
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() searcing for blocks finished");
// clLog()->Dump(XrdCl::AppMsg, "File::Read() searcing for block %p finished", (void*)(*bi));
if ((*bi)->is_finished())
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block");
clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block %p", (void*)(*bi));
finished.push_back(*bi);
BlockList_i bj = bi++;
blks_to_process.erase(bj);
Expand Down Expand Up @@ -560,7 +561,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
long long off_in_block; // offset in block
long long size_to_copy; // size to copy

clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok.");
// clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok.");
overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy);
memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy);
bytes_read += size_to_copy;
Expand All @@ -584,7 +585,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Fourth, make sure all direct requests have arrived
if (direct_handler != 0)
{
clLog()->Error(XrdCl::AppMsg, "File::Read() waiting for direct requests.");
clLog()->Debug(XrdCl::AppMsg, "File::Read() waiting for direct requests.");
XrdSysCondVarHelper _lck(direct_handler->m_cond);

if (direct_handler->m_to_wait > 0)
Expand Down Expand Up @@ -616,7 +617,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

for (BlockList_i bi = blks_processed.begin(); bi != blks_processed.end(); ++bi)
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() dec_ref_count %d %s", ((*bi)->m_offset/BufferSize()), lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() dec_ref_count b=%p, %d %s", (void*)(*bi), ((*bi)->m_offset/BufferSize()), lPath());
dec_ref_count(*bi);
// XXXX stamp block
}
Expand Down Expand Up @@ -728,8 +729,8 @@ void File::Sync()
void File::inc_ref_count(Block* b)
{
// Method always called under lock
// clLog()->Dump(XrdCl::AppMsg, "File::inc_ref_count %d %s ",b->m_refcnt, lPath());
b->m_refcnt++;
clLog()->Dump(XrdCl::AppMsg, "File::inc_ref_count b=%p, %d %s ",(void*)b, b->m_refcnt, lPath());
}

//______________________________________________________________________________
Expand All @@ -741,11 +742,13 @@ void File::dec_ref_count(Block* b)
b-> m_refcnt--;
assert(b->m_refcnt >= 0);

if ( b->m_refcnt == 0) {
//AMT ... this is ugly, ... File::Read() can decrease ref count before waiting to be , prefetch starts with refcnt 0
if ( b->m_refcnt == 0 && b->is_finished()) {
int i = b->m_offset/BufferSize();
clLog()->Dump(XrdCl::AppMsg, "File::dec_ref_count erase block (%p) %d %s ", (void*)b, i, lPath());
delete m_block_map[i];
size_t ret = m_block_map.erase(i);
// AMT free ram counter
if (ret != 1) {
clLog()->Error(XrdCl::AppMsg, "File::OnBlockZeroRefCount did not erase %d from map.", i);
}
Expand All @@ -759,10 +762,10 @@ void File::dec_ref_count(Block* b)

void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
{
clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d %s",(int)(b->m_offset/BufferSize()), lPath());

m_downloadCond.Lock();

clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %p, %d %s",(void*)b,(int)(b->m_offset/BufferSize()), lPath());
if (status->IsOK())
{
b->m_downloaded = true;
Expand All @@ -777,7 +780,7 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
{
// AMT how long to keep?
// when to retry?
clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %d error %s",(int)(b->m_offset/BufferSize()), lPath());
clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %p %d error %s",(void*)b,(int)(b->m_offset/BufferSize()), lPath());
XrdPosixMap::Result(*status);
// AMT could notfiy global cache we dont need RAM for that block
b->set_error_and_free(errno);
Expand Down Expand Up @@ -877,15 +880,15 @@ void File::Prefetch()
// remove block from map
cache()->DeRegisterPrefetchFile(this);
}
clLog()->Debug(XrdCl::AppMsg, "File::Prefetch end");
}
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch end");
} /*
else if (m_block_map.size() >= 3) {
clLog()->Dump(XrdCl::AppMsg,"skip prefetch %s ", lPath());
for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it )
{
clLog()->Dump(XrdCl::AppMsg, "block idx = %d, ref_count = %d, prefetch=%d [%s]", it->first, it->second->m_refcnt, it->second->m_prefetch, lPath());
}
}
}*/
}

UnMarkPrefetch();
Expand Down Expand Up @@ -943,6 +946,7 @@ void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::HandleResponse()");

m_block->m_file->ProcessBlockResponse(m_block, status);

delete status;
Expand Down

0 comments on commit 6591d0c

Please sign in to comment.