diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index ee922f3bc58..d25cfafcc14 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -74,10 +74,8 @@ m_temp_filename(disk_file_path), m_offset(iOffset), m_fileSize(iFileSize), -m_started(false), m_failed(false), m_stopping(false), -m_stopped(false), m_stateCond(0), // We will explicitly lock the condition before use. m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")), @@ -85,9 +83,9 @@ m_non_flushed_cnt(0), m_in_sync(false), m_block_cond(0) -// m_num_reads(0) { clLog()->Debug(XrdCl::AppMsg, "File::File() %s", m_input.Path()); + Open(); } File::~File() @@ -100,16 +98,20 @@ File::~File() while (true) { m_stateCond.Lock(); - bool isStopped = m_stopped; + bool isStopped = m_stopping; m_stateCond.UnLock(); if (isStopped) { - // TODO AMT: wait for map to clear - break; + printf("~FILE map size %ld \n", m_block_map.size()); + if ( m_block_map.empty()) + break; } XrdSysTimer::Wait(100); } clLog()->Debug(XrdCl::AppMsg, "File::~File finished with writing %s",lPath() ); + + + // Wait disk sync bool do_sync = false; { XrdSysMutexHelper _lck(&m_syncStatusMutex); @@ -147,11 +149,10 @@ bool File::InitiateClose() { // Retruns true if delay is needed clLog()->Debug(XrdCl::AppMsg, "File::Initiate close start", lPath()); - if (m_cfi.IsComplete()) return false; m_stateCond.Lock(); - if (m_started == false) return false; m_stopping = true; m_stateCond.UnLock(); + if (m_cfi.IsComplete()) return false; // AMT maybe map size is here more meaningfull return true; } @@ -272,6 +273,7 @@ Block* File::RequestBlock(int i) // // Reference count is 0 so increase it in calling function if you want to // catch the block while still in memory. + clLog()->Dump(XrdCl::AppMsg, "Request block %d ", i); XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile; @@ -295,6 +297,8 @@ Block* File::RequestBlock(int i) int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks, char* req_buf, long long req_off, long long req_size) { + + clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect %ld ", blocks.size()); XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile; const long long BS = m_cfi.GetBufferSize(); @@ -325,6 +329,8 @@ int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks, int File::ReadBlocksFromDisk(std::list& blocks, char* req_buf, long long req_off, long long req_size) { + + clLog()->Dump(XrdCl::AppMsg, "ReadBlocksFromDisk %ld ", blocks.size()); const long long BS = m_cfi.GetBufferSize(); long long total = 0; @@ -427,7 +433,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) DirectResponseHandler *direct_handler = 0; int direct_size = 0; - if ( ! blks_direct.empty()) + if (!blks_direct.empty()) { direct_handler = new DirectResponseHandler(blks_direct.size()); @@ -436,45 +442,54 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) } // Second, read blocks from disk. - int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize); - if (rc >= 0) - { - bytes_read += rc; + if (!blks_on_disk.empty()) { + int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize); + if (rc >= 0) + { + bytes_read += rc; + } + else + { + bytes_read = rc; // AMT ?? should there be an immediate return + } } - else - { - bytes_read = rc; // AMT ?? should there be an immediate return - } - // Third, loop over blocks that are available or incoming - while ( ! blks_to_process.empty() && bytes_read >= 0) + while ( ! blks_to_process.empty() && bytes_read >= 0) // AMT : do I need this loop ? { - BlockList_t finished; + BlockList_t finished; - { - XrdSysCondVarHelper _lck(m_block_cond); + { + XrdSysCondVarHelper _lck(m_block_cond); - BlockList_i bi = blks_to_process.begin(); - while (bi != blks_to_process.end()) - { - if ((*bi)->is_finished()) - { - finished.push_back(*bi); - BlockList_i bj = bi++; - blks_to_process.erase(bj); - } - else - { - ++bi; - } - } + BlockList_i bi = blks_to_process.begin(); + while (bi != blks_to_process.end()) + { + clLog()->Dump(XrdCl::AppMsg, "searcing for blocks finished"); + if ((*bi)->is_finished()) + { + clLog()->Dump(XrdCl::AppMsg, "found finished block"); + finished.push_back(*bi); + BlockList_i bj = bi++; + blks_to_process.erase(bj); + } + else + { + ++bi; + } + } - if (finished.empty()) - { - m_block_cond.Wait(); - continue; - } - } + if (finished.empty()) + { + + clLog()->Dump(XrdCl::AppMsg, "wait block begin"); + + m_block_cond.Wait(); + + clLog()->Dump(XrdCl::AppMsg, "wait block end"); + + continue; + } + } BlockList_i bi = finished.begin(); while (bi != finished.end()) @@ -486,23 +501,30 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) long long off_in_block; // offset in block long long size_to_copy; // size to copy + clLog()->Dump(XrdCl::AppMsg, "Block finished ok."); overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy); memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy); + bytes_read += size_to_copy; } else // it has failed ... krap up. { + + clLog()->Error(XrdCl::AppMsg, "Block finished with eorror."); bytes_read = -1; errno = (*bi)->m_errno; break; } + ++bi; } std::copy(finished.begin(), finished.end(), std::back_inserter(blks_processed)); + finished.clear(); } // Fourth, make sure all direct requests have arrived if (direct_handler != 0) { + clLog()->Error(XrdCl::AppMsg, "Read() waiting for direct requests."); XrdSysCondVarHelper _lck(direct_handler->m_cond); if (direct_handler->m_to_wait > 0) @@ -572,7 +594,7 @@ void File::WriteBlockToDisk(Block* b) } if (cnt > PREFETCH_MAX_ATTEMPTS) { - clLog()->Error(XrdCl::AppMsg, "File::WriteToDisk() write failes too manny attempts %s", lPath()); + clLog()->Error(XrdCl::AppMsg, "File::WriteToDisk() write failed too manny attempts %s", lPath()); return; } } @@ -663,6 +685,7 @@ void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) { + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"Direct block response"); XrdSysCondVarHelper _lck(m_cond); --m_to_wait; @@ -709,6 +732,8 @@ void File::dec_ref_count(Block* b) void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status) { + clLog()->Dump(XrdCl::AppMsg, "File::ProcessBlockResponse %d ",(int)(b->m_offset/BufferSize())); + m_block_cond.Lock(); if (status->IsOK()) diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index 5b713a2e0d9..d5b31aa5954 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -96,10 +96,9 @@ namespace XrdFileCache long long m_offset; //!< offset of cached file for block-based operation long long m_fileSize; //!< size of cached disk file for block-based operation - bool m_started; //!< state of run thread bool m_failed; //!< reading from original source or writing to disk has failed bool m_stopping; //!< run thread should be stopped - bool m_stopped; //!< prefetch is stopped + XrdSysCondVar m_stateCond; //!< state condition variable XrdSysMutex m_downloadStatusMutex; //!< mutex locking access to m_cfi object