Skip to content

Commit

Permalink
Keep failed blocks in map.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and osschar committed Mar 9, 2016
1 parent 133d5ea commit 7f16d37
Showing 1 changed file with 26 additions and 28 deletions.
54 changes: 26 additions & 28 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -90,7 +90,9 @@ m_prefetchScore(1),
m_prefetchCurrentCnt(0)
{
clLog()->Debug(XrdCl::AppMsg, "File::File() %s", m_input.Path());
Open();
if (!Open()) {
clLog()->Error(XrdCl::AppMsg, "File::File() Open failed %s !!!", m_input.Path());
}
}


Expand All @@ -105,7 +107,7 @@ File::~File()
m_stateCond.UnLock();
}

// cache()->RemoveWriteQEntriesFor(this);
cache()->RemoveWriteQEntriesFor(this);

clLog()->Info(XrdCl::AppMsg, "File::~File() check write queues ...%s", lPath());

Expand All @@ -118,6 +120,15 @@ File::~File()
if (isPrefetching == false)
{
m_downloadCond.Lock();
// remove failed blocks
for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end();) {
if (it->second->is_finished() && it->second->m_refcnt == 1) // refcounf more than 1 if used by Read()
m_block_map.erase(it);
else
++it;
}


bool blockMapEmpty = m_block_map.empty();
int blocksize = (int)m_block_map.size();
m_downloadCond.UnLock();
Expand Down Expand Up @@ -197,7 +208,7 @@ bool File::InitiateClose()

bool File::Open()
{
// clLog()->Debug(XrdCl::AppMsg, "File::Open() open file for disk cache %s", m_input.Path());
clLog()->Dump(XrdCl::AppMsg, "File::Open() open file for disk cache %s", m_input.Path());

XrdOss &m_output_fs = *Factory::GetInstance().GetOss();
// Create the data file itself.
Expand Down Expand Up @@ -320,10 +331,10 @@ Block* File::RequestBlock(int i, bool prefetch)

Block *b = new Block(this, off, this_bs, prefetch); // should block be reused to avoid recreation
m_block_map[i] = b;
clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() %p idx=%d pOn=(%d) %s", (void*)b, i, prefetch, lPath());

client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b));

clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d END", i);
return b;
}

Expand Down Expand Up @@ -446,8 +457,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Is there room for one more RAM Block?
if ( cache()->RequestRAMBlock())
{
clLog()->Debug(XrdCl::AppMsg, "File::Read() request block to fetch %d", block_idx);
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count new %d %s", block_idx, lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count new %d %s", block_idx, lPath());
Block *b = RequestBlock(block_idx, false);
inc_ref_count(b);
blks_to_process.push_back(b);
Expand Down Expand Up @@ -550,7 +560,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
}
else // it has failed ... krap up.
{
clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with eorror.");
clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with error.");
bytes_read = -1;
errno = (*bi)->m_errno;
break;
Expand Down Expand Up @@ -590,8 +600,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
{
XrdSysCondVarHelper _lck(m_downloadCond);

// XXXX stamp file
// AMT ??? fetched status stampled in WriteDisk callback , what dies stamp mean ??
// XXXX stamp file ???

// blks_to_process can be non-empty, if we're exiting with an error.
std::copy(blks_to_process.begin(), blks_to_process.end(), std::back_inserter(blks_processed));
Expand Down Expand Up @@ -650,7 +659,7 @@ void File::WriteBlockToDisk(Block* b)

{
XrdSysCondVarHelper _lck(m_downloadCond);
clLog()->Dump(XrdCl::AppMsg, "File::WriteToDisk() dec_ref_count %d %s", pfIdx, lPath());
// clLog()->Dump(XrdCl::AppMsg, "File::WriteToDisk() dec_ref_count %d %s", pfIdx, lPath());
dec_ref_count(b);
}

Expand All @@ -669,7 +678,7 @@ void File::WriteBlockToDisk(Block* b)
++m_non_flushed_cnt;
}

if (m_non_flushed_cnt >= 100)
if (m_non_flushed_cnt >= 100 && (m_cfi.IsComplete() && m_non_flushed_cnt > 0))
{
schedule_sync = true;
m_in_sync = true;
Expand Down Expand Up @@ -710,7 +719,7 @@ void File::Sync()
void File::inc_ref_count(Block* b)
{
// Method always called under lock
clLog()->Error(XrdCl::AppMsg, "File::inc_ref_count %d %s ",b->m_refcnt, lPath());
// clLog()->Dump(XrdCl::AppMsg, "File::inc_ref_count %d %s ",b->m_refcnt, lPath());
b->m_refcnt++;
}

Expand All @@ -720,12 +729,12 @@ void File::inc_ref_count(Block* b)
void File::dec_ref_count(Block* b)
{
// Method always called under lock
// clLog()->Error(XrdCl::AppMsg, "File::dec_ref_count %d %s ",b->m_refcnt, lPath());
b-> m_refcnt--;
assert(b->m_refcnt >= 0);

if ( b->m_refcnt == 0) {
int i = b->m_offset/BufferSize();
clLog()->Error(XrdCl::AppMsg, "File::dec_ref_count erase block (%p) %d %s ", (void*)b, i, lPath());
delete m_block_map[i];
size_t ret = m_block_map.erase(i);
if (ret != 1) {
Expand All @@ -741,7 +750,7 @@ void File::dec_ref_count(Block* b)

void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
{
// clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d %s",(int)(b->m_offset/BufferSize()), lPath());
clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d %s",(int)(b->m_offset/BufferSize()), lPath());

m_downloadCond.Lock();

Expand All @@ -761,25 +770,14 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
// when to retry?
clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %d error %s",(int)(b->m_offset/BufferSize()), lPath());
XrdPosixMap::Result(*status);

// AMT could notfiy global cache we dont need RAM for that block
b->set_error_and_free(errno);
errno = 0;

// ??? AMT temprary commented out -- throw away failed attempts
// inc_ref_count(b);
// ??? AMT how long to keep
inc_ref_count(b);
}


// case when there is a prefetch that is failed or prefetch that stopped has just been initated
if (b->m_refcnt == 0) {
assert(b->m_prefetch);
cache()->RAMBlockReleased();
int bidx = (int)(b->m_offset/BufferSize());
delete m_block_map[bidx];
m_block_map.erase(bidx);
}


m_downloadCond.Broadcast();

m_downloadCond.UnLock();
Expand Down

0 comments on commit 7f16d37

Please sign in to comment.