diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index bd862032911..72f46baee00 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -90,7 +90,9 @@ m_prefetchScore(1), m_prefetchCurrentCnt(0) { clLog()->Debug(XrdCl::AppMsg, "File::File() %s", m_input.Path()); - Open(); + if (!Open()) { + clLog()->Error(XrdCl::AppMsg, "File::File() Open failed %s !!!", m_input.Path()); + } } @@ -105,7 +107,7 @@ File::~File() m_stateCond.UnLock(); } - // cache()->RemoveWriteQEntriesFor(this); + cache()->RemoveWriteQEntriesFor(this); clLog()->Info(XrdCl::AppMsg, "File::~File() check write queues ...%s", lPath()); @@ -118,6 +120,15 @@ File::~File() if (isPrefetching == false) { m_downloadCond.Lock(); + // remove failed blocks + for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end();) { + if (it->second->is_finished() && it->second->m_refcnt == 1) // refcounf more than 1 if used by Read() + m_block_map.erase(it); + else + ++it; + } + + bool blockMapEmpty = m_block_map.empty(); int blocksize = (int)m_block_map.size(); m_downloadCond.UnLock(); @@ -197,7 +208,7 @@ bool File::InitiateClose() bool File::Open() { - // clLog()->Debug(XrdCl::AppMsg, "File::Open() open file for disk cache %s", m_input.Path()); + clLog()->Dump(XrdCl::AppMsg, "File::Open() open file for disk cache %s", m_input.Path()); XrdOss &m_output_fs = *Factory::GetInstance().GetOss(); // Create the data file itself. @@ -320,10 +331,10 @@ Block* File::RequestBlock(int i, bool prefetch) Block *b = new Block(this, off, this_bs, prefetch); // should block be reused to avoid recreation m_block_map[i] = b; + clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() %p idx=%d pOn=(%d) %s", (void*)b, i, prefetch, lPath()); client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b)); - clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d END", i); return b; } @@ -446,8 +457,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) // Is there room for one more RAM Block? if ( cache()->RequestRAMBlock()) { - clLog()->Debug(XrdCl::AppMsg, "File::Read() request block to fetch %d", block_idx); - clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count new %d %s", block_idx, lPath()); + clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count new %d %s", block_idx, lPath()); Block *b = RequestBlock(block_idx, false); inc_ref_count(b); blks_to_process.push_back(b); @@ -550,7 +560,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) } else // it has failed ... krap up. { - clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with eorror."); + clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with error."); bytes_read = -1; errno = (*bi)->m_errno; break; @@ -590,8 +600,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) { XrdSysCondVarHelper _lck(m_downloadCond); - // XXXX stamp file - // AMT ??? fetched status stampled in WriteDisk callback , what dies stamp mean ?? + // XXXX stamp file ??? // blks_to_process can be non-empty, if we're exiting with an error. std::copy(blks_to_process.begin(), blks_to_process.end(), std::back_inserter(blks_processed)); @@ -650,7 +659,7 @@ void File::WriteBlockToDisk(Block* b) { XrdSysCondVarHelper _lck(m_downloadCond); - clLog()->Dump(XrdCl::AppMsg, "File::WriteToDisk() dec_ref_count %d %s", pfIdx, lPath()); + // clLog()->Dump(XrdCl::AppMsg, "File::WriteToDisk() dec_ref_count %d %s", pfIdx, lPath()); dec_ref_count(b); } @@ -669,7 +678,7 @@ void File::WriteBlockToDisk(Block* b) ++m_non_flushed_cnt; } - if (m_non_flushed_cnt >= 100) + if (m_non_flushed_cnt >= 100 && (m_cfi.IsComplete() && m_non_flushed_cnt > 0)) { schedule_sync = true; m_in_sync = true; @@ -710,7 +719,7 @@ void File::Sync() void File::inc_ref_count(Block* b) { // Method always called under lock - clLog()->Error(XrdCl::AppMsg, "File::inc_ref_count %d %s ",b->m_refcnt, lPath()); + // clLog()->Dump(XrdCl::AppMsg, "File::inc_ref_count %d %s ",b->m_refcnt, lPath()); b->m_refcnt++; } @@ -720,12 +729,12 @@ void File::inc_ref_count(Block* b) void File::dec_ref_count(Block* b) { // Method always called under lock - // clLog()->Error(XrdCl::AppMsg, "File::dec_ref_count %d %s ",b->m_refcnt, lPath()); b-> m_refcnt--; assert(b->m_refcnt >= 0); if ( b->m_refcnt == 0) { int i = b->m_offset/BufferSize(); + clLog()->Error(XrdCl::AppMsg, "File::dec_ref_count erase block (%p) %d %s ", (void*)b, i, lPath()); delete m_block_map[i]; size_t ret = m_block_map.erase(i); if (ret != 1) { @@ -741,7 +750,7 @@ void File::dec_ref_count(Block* b) void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status) { - // clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d %s",(int)(b->m_offset/BufferSize()), lPath()); + clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d %s",(int)(b->m_offset/BufferSize()), lPath()); m_downloadCond.Lock(); @@ -761,25 +770,14 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status) // when to retry? clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %d error %s",(int)(b->m_offset/BufferSize()), lPath()); XrdPosixMap::Result(*status); - + // AMT could notfiy global cache we dont need RAM for that block b->set_error_and_free(errno); errno = 0; - // ??? AMT temprary commented out -- throw away failed attempts - // inc_ref_count(b); + // ??? AMT how long to keep + inc_ref_count(b); } - - // case when there is a prefetch that is failed or prefetch that stopped has just been initated - if (b->m_refcnt == 0) { - assert(b->m_prefetch); - cache()->RAMBlockReleased(); - int bidx = (int)(b->m_offset/BufferSize()); - delete m_block_map[bidx]; - m_block_map.erase(bidx); - } - - m_downloadCond.Broadcast(); m_downloadCond.UnLock();