diff --git a/src/XrdFileCache/XrdFileCache.cc b/src/XrdFileCache/XrdFileCache.cc index 6e808c9b448..b2b678240c0 100644 --- a/src/XrdFileCache/XrdFileCache.cc +++ b/src/XrdFileCache/XrdFileCache.cc @@ -179,7 +179,7 @@ bool Cache::RequestRAMBlock() { XrdSysMutexHelper lock(&m_RAMblock_mutex); - if ( m_RAMblocks_used > Factory::GetInstance().RefConfiguration().m_NRamBuffers ) + if ( m_RAMblocks_used < Factory::GetInstance().RefConfiguration().m_NRamBuffers ) { m_RAMblocks_used++; return true; @@ -249,8 +249,10 @@ File* Cache::GetNextFileToPrefetch() { XrdSysMutexHelper lock(&m_prefetch_mutex); - if (m_files.empty()) + if (m_files.empty()) { + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::GetNextFileToPrefetch ... no open files"); return 0; + } std::sort(m_files.begin(), m_files.end(), myobject); File* f = m_files.back(); @@ -264,14 +266,18 @@ Cache::GetNextFileToPrefetch() void Cache::Prefetch() { + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch thread start"); + while (true) { File* f = GetNextFileToPrefetch(); + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch got file (%p)", f); + if (f) { f->Prefetch(); } else { // wait for new file, AMT should I wait for the signal instead ??? - XrdSysTimer::Wait(10); + XrdSysTimer::Wait(1); } } } diff --git a/src/XrdFileCache/XrdFileCacheFactory.cc b/src/XrdFileCache/XrdFileCacheFactory.cc index 75c55f23c55..68611c5ddc3 100644 --- a/src/XrdFileCache/XrdFileCacheFactory.cc +++ b/src/XrdFileCache/XrdFileCacheFactory.cc @@ -310,6 +310,7 @@ bool Factory::Config(XrdSysLogger *logger, const char *config_filename, const ch bool Factory::ConfigParameters(std::string part, XrdOucStream& config ) { + printf("part %s \n", part.c_str()); XrdSysError err(0, ""); if ( part == "user" ) { @@ -366,7 +367,9 @@ bool Factory::ConfigParameters(std::string part, XrdOucStream& config ) } else if (part == "prefetch" ) { + printf("prefetch enabled !!!!\n"); m_configuration.m_prefetch = true; + config.GetWord(); } else if (part == "nram" ) { diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index 436c1813bcf..2bba3dbaa59 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -225,7 +225,7 @@ bool File::Open() return false; } - if (m_cfi.Read(m_infoFile) <= 0) + if (m_cfi.Read(m_infoFile, Factory::GetInstance().RefConfiguration().m_prefetch) <= 0) { int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1; clLog()->Info(XrdCl::AppMsg, "Creating new file info with size %lld. Reserve space for %d blocks %s", m_fileSize, ss, m_input.Path()); @@ -291,7 +291,7 @@ Block* File::RequestBlock(int i, bool prefetch) // // Reference count is 0 so increase it in calling function if you want to // catch the block while still in memory. - clLog()->Dump(XrdCl::AppMsg, "Request block %d ", i); + clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d pOn=(%d)", i, prefetch); XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile; @@ -306,6 +306,7 @@ Block* File::RequestBlock(int i, bool prefetch) client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b)); + clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d END", i); return b; } @@ -347,7 +348,7 @@ int File::ReadBlocksFromDisk(std::list& blocks, char* req_buf, long long req_off, long long req_size) { - clLog()->Dump(XrdCl::AppMsg, "ReadBlocksFromDisk %ld ", blocks.size()); + clLog()->Dump(XrdCl::AppMsg, "File::ReadBlocksFromDisk %ld ", blocks.size()); const long long BS = m_cfi.GetBufferSize(); long long total = 0; @@ -380,6 +381,8 @@ int File::ReadBlocksFromDisk(std::list& blocks, int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) { + clLog()->Dump(XrdCl::AppMsg, "File::Read() begin "); + const long long BS = m_cfi.GetBufferSize(); // lock @@ -426,6 +429,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) // Is there room for one more RAM Block? if ( cache()->RequestRAMBlock()) { + clLog()->Debug(XrdCl::AppMsg, "File::Read() request block to fetch %d", block_idx); Block *b = RequestBlock(block_idx, false); inc_ref_count(b); blks_to_process.push_back(b); @@ -434,6 +438,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) // Nope ... read this directly without caching. else { + clLog()->Debug(XrdCl::AppMsg, "File::Read() direct block %d", block_idx); blks_direct.push_back(block_idx); } } @@ -479,10 +484,10 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) BlockList_i bi = blks_to_process.begin(); while (bi != blks_to_process.end()) { - clLog()->Dump(XrdCl::AppMsg, "searcing for blocks finished"); + clLog()->Dump(XrdCl::AppMsg, "File::Read() searcing for blocks finished"); if ((*bi)->is_finished()) { - clLog()->Dump(XrdCl::AppMsg, "found finished block"); + clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block"); finished.push_back(*bi); BlockList_i bj = bi++; blks_to_process.erase(bj); @@ -496,11 +501,11 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) if (finished.empty()) { - clLog()->Dump(XrdCl::AppMsg, "wait block begin"); + clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block begin"); m_downloadCond.Wait(); - clLog()->Dump(XrdCl::AppMsg, "wait block end"); + clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block end"); continue; } @@ -515,7 +520,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) long long off_in_block; // offset in block long long size_to_copy; // size to copy - clLog()->Dump(XrdCl::AppMsg, "Block finished ok."); + clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok."); overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy); memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy); bytes_read += size_to_copy; @@ -524,7 +529,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) } else // it has failed ... krap up. { - clLog()->Error(XrdCl::AppMsg, "Block finished with eorror."); + clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with eorror."); bytes_read = -1; errno = (*bi)->m_errno; break; @@ -539,7 +544,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize) // Fourth, make sure all direct requests have arrived if (direct_handler != 0) { - clLog()->Error(XrdCl::AppMsg, "Read() waiting for direct requests."); + clLog()->Error(XrdCl::AppMsg, "File::Read() waiting for direct requests."); XrdSysCondVarHelper _lck(direct_handler->m_cond); if (direct_handler->m_to_wait > 0) @@ -785,29 +790,40 @@ void File::Prefetch() if (!stopping) { + + clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch enter to check download status \n"); XrdSysCondVarHelper _lck(m_downloadCond); - if (m_cfi.IsComplete() == false) + clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch enter to check download status BEGIN \n"); + if (m_cfi.IsComplete() == false && m_block_map.size() < 1600) { - int block_idx = -1; + clLog()->Debug(XrdCl::AppMsg, "Prefetch::Prefetch begin, block size %ld", m_block_map.size()); + // check index not on disk and not in RAM - for (int f = 0; f < m_cfi.GetSizeInBits(); ++f) + bool found = false; + for (int f=0; f < m_cfi.GetSizeInBits(); ++f) { + clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch test bit %d", f); if (!m_cfi.TestBit(f)) { - BlockMap_i bi = m_block_map.find(block_idx); + BlockMap_i bi = m_block_map.find(f); if (bi == m_block_map.end()) { - block_idx = f; + clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch take block %d", f); + Block *b = RequestBlock(f, true); + inc_ref_count(b); + m_prefetchReadCnt++; + m_prefetchScore = m_prefetchHitCnt/m_prefetchReadCnt; + found = true; break; } } } - - if (cache()->RequestRAMBlock()) { - m_prefetchReadCnt++; - m_prefetchScore = m_prefetchHitCnt/m_prefetchReadCnt; - Block *b = RequestBlock(block_idx, true); - inc_ref_count(b); + if (!found) { + clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch no free blcok found "); + m_cfi.CheckComplete(); + if (m_cfi.IsComplete() == false) + clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch This shoulf not happedn !!!"); } + clLog()->Debug(XrdCl::AppMsg, "Prefetch::Prefetch end"); } } @@ -844,6 +860,8 @@ float File::GetPrefetchScore() const //______________________________________________________________________________ void File::MarkPrefetch() { + + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"File::MarkPrefetch()"); m_stateCond.Lock(); m_prefetchCurrentCnt++; m_stateCond.UnLock(); @@ -865,6 +883,7 @@ void File::UnMarkPrefetch() void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) { + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::HandleResponse()"); m_block->m_file->ProcessBlockResponse(m_block, status); delete status; @@ -878,7 +897,7 @@ void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) { - XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"Direct block response"); + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"DirectResponseHandler::HandleRespons()"); XrdSysCondVarHelper _lck(m_cond); --m_to_wait; diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc index 2a6c9d66e8c..7844f29d484 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc @@ -49,7 +49,7 @@ IOEntireFile::~IOEntireFile() bool IOEntireFile::ioActive() { - printf("called ioActive ...\n"); + printf("called ioActive ...\n"); return m_file->InitiateClose(); } diff --git a/src/XrdFileCache/XrdFileCacheInfo.cc b/src/XrdFileCache/XrdFileCacheInfo.cc index 9323e5748d7..b08d8cbc51b 100644 --- a/src/XrdFileCache/XrdFileCacheInfo.cc +++ b/src/XrdFileCache/XrdFileCacheInfo.cc @@ -56,14 +56,14 @@ Info::~Info() //______________________________________________________________________________ -void Info::ResizeBits(int s, bool prefetch_stat) +void Info::ResizeBits(int s, bool init_prefetch_buff) { m_sizeInBits = s; m_buff_fetched = (unsigned char*)malloc(GetSizeInBytes()); m_buff_write_called = (unsigned char*)malloc(GetSizeInBytes()); memset(m_buff_fetched, 0, GetSizeInBytes()); memset(m_buff_write_called, 0, GetSizeInBytes()); - if (prefetch_stat) { + if (init_prefetch_buff) { m_buff_prefetch = (unsigned char*)malloc(GetSizeInBytes()); memset(m_buff_prefetch, 0, GetSizeInBytes()); } @@ -72,7 +72,7 @@ void Info::ResizeBits(int s, bool prefetch_stat) //______________________________________________________________________________ -int Info::Read(XrdOssDF* fp) +int Info::Read(XrdOssDF* fp, bool init_prefetch_buff ) { // does not need lock, called only in File::Open // before File::Run() starts @@ -95,6 +95,13 @@ int Info::Read(XrdOssDF* fp) off += fp->Read(&m_accessCnt, off, sizeof(int)); clLog()->Dump(XrdCl::AppMsg, "Info:::Read() complete %d access_cnt %d", m_complete, m_accessCnt); + + + if (init_prefetch_buff) { + m_buff_prefetch = (unsigned char*)malloc(GetSizeInBytes()); + memset(m_buff_prefetch, 0, GetSizeInBytes()); + } + return off; } diff --git a/src/XrdFileCache/XrdFileCacheInfo.hh b/src/XrdFileCache/XrdFileCacheInfo.hh index e2508216820..adcbeb6a54d 100644 --- a/src/XrdFileCache/XrdFileCacheInfo.hh +++ b/src/XrdFileCache/XrdFileCacheInfo.hh @@ -100,7 +100,7 @@ namespace XrdFileCache //! //! @return number of bytes read //--------------------------------------------------------------------- - int Read(XrdOssDF* fp); + int Read(XrdOssDF* fp, bool init_prefetch = false); //--------------------------------------------------------------------- //! Write number of blocks and read buffer size