From 8caf77ff5d95deabe2d4d3ea5ccdb95ddd37fb4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matev=C5=BE=20Tadel?= Date: Mon, 6 Jun 2022 23:40:51 -0700 Subject: [PATCH] [Pfc] Implement async read and readV from the perspective of XrdOucCacheIO. (#1717) Implement async Read, pgRead, and ReadV. Mark file as complete when final block is successfully written to disk. Perform sync when file is completely downloaded, do not wait for final detach. Adjust default block size down to 128 kB to be closer to async default of 64 kB. Rename class IOEntireFile to IOFile. IO: Use override for virtual methods. File: Cache some frequently used variables in adjacent locations. Info: Coalesce functions SetBufferSize/FileSizeAndCreationTime() into a single function as this all needs to happen at the same time. Put pfc.blocksize into env XRDPFC.SEGSIZE. --- src/XrdPfc.cmake | 3 +- src/XrdPfc/XrdPfc.cc | 12 +- src/XrdPfc/XrdPfcCommand.cc | 3 +- src/XrdPfc/XrdPfcConfiguration.cc | 6 +- src/XrdPfc/XrdPfcFile.cc | 766 ++++++++++-------- src/XrdPfc/XrdPfcFile.hh | 278 ++++--- src/XrdPfc/XrdPfcIO.cc | 3 +- src/XrdPfc/XrdPfcIO.hh | 37 +- src/XrdPfc/XrdPfcIOEntireFile.cc | 201 ----- src/XrdPfc/XrdPfcIOFile.cc | 349 ++++++++ ...{XrdPfcIOEntireFile.hh => XrdPfcIOFile.hh} | 53 +- src/XrdPfc/XrdPfcIOFileBlock.cc | 33 +- src/XrdPfc/XrdPfcIOFileBlock.hh | 16 +- src/XrdPfc/XrdPfcInfo.cc | 20 +- src/XrdPfc/XrdPfcInfo.hh | 24 +- src/XrdPfc/XrdPfcStats.hh | 7 + src/XrdPfc/XrdPfcVRead.cc | 419 ---------- 17 files changed, 1098 insertions(+), 1132 deletions(-) delete mode 100644 src/XrdPfc/XrdPfcIOEntireFile.cc create mode 100644 src/XrdPfc/XrdPfcIOFile.cc rename src/XrdPfc/{XrdPfcIOEntireFile.hh => XrdPfcIOFile.hh} (70%) delete mode 100644 src/XrdPfc/XrdPfcVRead.cc diff --git a/src/XrdPfc.cmake b/src/XrdPfc.cmake index b5ab5c4a1cb..52fca1839c0 100644 --- a/src/XrdPfc.cmake +++ b/src/XrdPfc.cmake @@ -23,11 +23,10 @@ add_library( XrdPfc/XrdPfcPurge.cc XrdPfc/XrdPfcCommand.cc XrdPfc/XrdPfcFile.cc XrdPfc/XrdPfcFile.hh - XrdPfc/XrdPfcVRead.cc XrdPfc/XrdPfcStats.hh XrdPfc/XrdPfcInfo.cc XrdPfc/XrdPfcInfo.hh XrdPfc/XrdPfcIO.cc XrdPfc/XrdPfcIO.hh - XrdPfc/XrdPfcIOEntireFile.cc XrdPfc/XrdPfcIOEntireFile.hh + XrdPfc/XrdPfcIOFile.cc XrdPfc/XrdPfcIOFile.hh XrdPfc/XrdPfcIOFileBlock.cc XrdPfc/XrdPfcIOFileBlock.hh XrdPfc/XrdPfcDecision.hh) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 731d69dbf30..516da4240bf 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -38,7 +38,7 @@ #include "XrdPfc.hh" #include "XrdPfcTrace.hh" #include "XrdPfcInfo.hh" -#include "XrdPfcIOEntireFile.hh" +#include "XrdPfcIOFile.hh" #include "XrdPfcIOFileBlock.hh" using namespace XrdPfc; @@ -219,18 +219,18 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) } else { - IOEntireFile *ioef = new IOEntireFile(io, *this); + IOFile *iof = new IOFile(io, *this); - if ( ! ioef->HasFile()) + if ( ! iof->HasFile()) { - delete ioef; + delete iof; // TODO - redirect instead. But this is kind of an awkward place for it. - // errno is set during IOEntireFile construction. + // errno is set during IOFile construction. TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path()); return io; } - cio = ioef; + cio = iof; } TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " << diff --git a/src/XrdPfc/XrdPfcCommand.cc b/src/XrdPfc/XrdPfcCommand.cc index 3a1bc0cf1b0..60d0ca3f19b 100644 --- a/src/XrdPfc/XrdPfcCommand.cc +++ b/src/XrdPfc/XrdPfcCommand.cc @@ -233,8 +233,7 @@ void Cache::ExecuteCommandUrl(const std::string& command_url) // Fill up cinfo. Info myInfo(m_trace, false); - myInfo.SetBufferSize(block_size); - myInfo.SetFileSizeAndCreationTime(file_size); + myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size); myInfo.SetAllBitsSynced(); for (int i = 0; i < at_count; ++i) diff --git a/src/XrdPfc/XrdPfcConfiguration.cc b/src/XrdPfc/XrdPfcConfiguration.cc index 328ab7f755f..2d78ada8ad1 100644 --- a/src/XrdPfc/XrdPfcConfiguration.cc +++ b/src/XrdPfc/XrdPfcConfiguration.cc @@ -36,7 +36,7 @@ Configuration::Configuration() : m_accHistorySize(20), m_dirStatsMaxDepth(-1), m_dirStatsStoreDepth(0), - m_bufferSize(256*1024), + m_bufferSize(128*1024), m_RamAbsAvailable(0), m_RamKeepStdBlocks(0), m_wqueue_blocks(16), @@ -303,7 +303,7 @@ bool Cache::Config(const char *config_filename, const char *parameters) // Adjust default parameters for client/serverless caching if (m_isClient) { - m_configuration.m_bufferSize = 256 * 1024; + m_configuration.m_bufferSize = 128 * 1024; // same as normal. m_configuration.m_wqueue_blocks = 8; m_configuration.m_wqueue_threads = 1; } @@ -532,6 +532,8 @@ bool Cache::Config(const char *config_filename, const char *parameters) } m_log.Say(buff); + + m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str()); } // Derived settings diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index 973a412790a..7901ef879c3 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -52,8 +52,6 @@ const char *File::m_traceID = "File"; File::File(const std::string& path, long long iOffset, long long iFileSize) : m_ref_cnt(0), - m_is_open(false), - m_in_shutdown(false), m_data_file(0), m_info_file(0), m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0), @@ -64,14 +62,16 @@ File::File(const std::string& path, long long iOffset, long long iFileSize) : m_ios_in_detach(0), m_non_flushed_cnt(0), m_in_sync(false), + m_detach_time_logged(false), + m_in_shutdown(false), m_state_cond(0), + m_block_size(0), + m_num_blocks(0), m_prefetch_state(kOff), m_prefetch_read_cnt(0), m_prefetch_hit_cnt(0), - m_prefetch_score(1), - m_detach_time_logged(false) -{ -} + m_prefetch_score(0) +{} File::~File() { @@ -156,7 +156,7 @@ Stats File::DeltaStatsFromLastCall() void File::BlockRemovedFromWriteQ(Block* b) { - TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_cfi.GetBufferSize()); + TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size); XrdSysCondVarHelper _lck(m_state_cond); dec_ref_count(b); @@ -196,12 +196,6 @@ bool File::ioActive(IO *io) { XrdSysCondVarHelper _lck(m_state_cond); - if ( ! m_is_open) - { - TRACEF(Error, "ioActive for io " << io <<" called on a closed file. This should not happen."); - return false; - } - IoMap_i mi = m_io_map.find(io); if (mi != m_io_map.end()) @@ -272,7 +266,7 @@ bool File::FinalizeSyncBeforeExit() // This method is called after corresponding IO is detached from PosixCache. XrdSysCondVarHelper _lck(m_state_cond); - if (m_is_open && ! m_in_shutdown) + if ( ! m_in_shutdown) { if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged) { @@ -374,12 +368,6 @@ bool File::Open() TRACEF(Dump, tpfx << "open file for disk cache"); - if (m_is_open) - { - TRACEF(Error, tpfx << "file is already opened."); - return true; - } - const Configuration &conf = Cache::GetInstance().RefConfiguration(); XrdOss &myOss = * Cache::GetInstance().GetOss(); @@ -464,16 +452,16 @@ bool File::Open() m_cfi.ResetAllAccessStats(); m_data_file->Ftruncate(0); } else { - // If a file is complete, we don't really need to reset net cksums ... well, maybe next time. + // TODO: If the file is complete, we don't need to reset net cksums. m_cfi.DowngradeCkSumState(conf.get_cs_Chk()); } } if (initialize_info_file) { - m_cfi.SetBufferSize(conf.m_bufferSize); - m_cfi.SetFileSizeAndCreationTime(m_file_size); + m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size); m_cfi.SetCkSumState(conf.get_cs_Chk()); + m_cfi.ResetNoCkSumTime(); m_cfi.Write(m_info_file, ifn.c_str()); m_info_file->Fsync(); TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()); @@ -481,7 +469,8 @@ bool File::Open() m_cfi.WriteIOStatAttach(); m_state_cond.Lock(); - m_is_open = true; + m_block_size = m_cfi.GetBufferSize(); + m_num_blocks = m_cfi.GetNBlocks(); m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO(). m_state_cond.UnLock(); @@ -500,7 +489,7 @@ bool File::overlap(int blk, // block to query // output: long long &off, // offset in user buffer long long &blk_off, // offset in block - long long &size) // size to copy + int &size) // size to copy { const long long beg = blk * blk_size; const long long end = beg + blk_size; @@ -513,7 +502,7 @@ bool File::overlap(int blk, // block to query off = ovlp_beg - req_off; blk_off = ovlp_beg - beg; - size = ovlp_end - ovlp_beg; + size = (int) (ovlp_end - ovlp_beg); assert(size <= blk_size); return true; @@ -526,17 +515,16 @@ bool File::overlap(int blk, // block to query //------------------------------------------------------------------------------ -Block* File::PrepareBlockRequest(int i, IO *io, bool prefetch) +Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch) { - // Must be called w/ block_map locked. + // Must be called w/ state_cond locked. // Checks on size etc should be done before. // // Reference count is 0 so increase it in calling function if you want to // catch the block while still in memory. - const long long BS = m_cfi.GetBufferSize(); - const long long off = i * BS; - const int last_block = m_cfi.GetNBlocks() - 1; + const long long off = i * m_block_size; + const int last_block = m_num_blocks - 1; const bool cs_net = cache()->RefConfiguration().is_cschk_net(); int blk_size, req_size; @@ -544,7 +532,7 @@ Block* File::PrepareBlockRequest(int i, IO *io, bool prefetch) blk_size = req_size = m_file_size - off; if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000; } else { - blk_size = req_size = BS; + blk_size = req_size = m_block_size; } Block *b = 0; @@ -552,7 +540,7 @@ Block* File::PrepareBlockRequest(int i, IO *io, bool prefetch) if (buf) { - b = new (std::nothrow) Block(this, io, buf, off, blk_size, req_size, prefetch, cs_net); + b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net); if (b) { @@ -603,338 +591,337 @@ void File::ProcessBlockRequests(BlockList_t& blks) //------------------------------------------------------------------------------ -int File::RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t& blocks, - char* req_buf, long long req_off, long long req_size) +void File::RequestBlocksDirect(IO *io, DirectResponseHandler *handler, std::vector& ioVec, int expected_size) { - const long long BS = m_cfi.GetBufferSize(); + TRACEF(Dump, "RequestBlocksDirect issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size); - // TODO Use readv to load more at the same time. + io->GetInput()->ReadV( *handler, ioVec.data(), (int) ioVec.size()); +} - long long total = 0; +//------------------------------------------------------------------------------ - for (IntList_i ii = blocks.begin(); ii != blocks.end(); ++ii) - { - // overlap and request - long long off; // offset in user buffer - long long blk_off; // offset in block - long long size; // size to copy +int File::ReadBlocksFromDisk(std::vector& ioVec, int expected_size) +{ + TRACEF(Dump, "ReadBlocksFromDisk issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size); - overlap(*ii, BS, req_off, req_size, off, blk_off, size); + long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size()); - io->GetInput()->Read( *handler, req_buf + off, *ii * BS + blk_off, size); - TRACEF(Dump, "RequestBlockDirect success, idx = " << *ii << " size = " << size); + if (rs < 0) + { + TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs); + return rs; + } - total += size; + if (rs != expected_size) + { + TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs); + return -EIO; } - return total; + return (int) rs; } //------------------------------------------------------------------------------ -int File::ReadBlocksFromDisk(std::list& blocks, - char* req_buf, long long req_off, long long req_size) +int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh) { - TRACEF(Dump, "ReadBlocksFromDisk " << blocks.size()); - const long long BS = m_cfi.GetBufferSize(); + // rrc_func is ONLY called from async processing. + // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller. + // This streamlines implementation of synchronous IO::Read(). - long long total = 0; + TRACEF(Dump, "Read sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize); - // Coalesce adjacent reads. + m_state_cond.Lock(); - for (IntList_i ii = blocks.begin(); ii != blocks.end(); ++ii) + if (m_in_shutdown) { - // overlap and read - long long off; // offset in user buffer - long long blk_off; // offset in block - long long size; // size to copy - - overlap(*ii, BS, req_off, req_size, off, blk_off, size); - - long long rs = m_data_file->Read(req_buf + off, *ii * BS + blk_off -m_offset, size); - TRACEF(Dump, "ReadBlocksFromDisk block idx = " << *ii << " size= " << size); - - if (rs < 0) - { - TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs << " idx = " << *ii ); - return rs; - } + m_state_cond.UnLock(); + return -ENOENT; + } - if (rs != size) - { - TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs << " idx = " << *ii); - return -EIO; - } + // Shortcut -- file is fully downloaded. - total += rs; + if (m_cfi.IsComplete()) + { + m_state_cond.UnLock(); + int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize); + if (ret > 0) m_stats.AddBytesHit(ret); + return ret; } - return total; + XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } ); + + return ReadOpusCoalescere(io, &readV, 1, rh, "Read() "); } //------------------------------------------------------------------------------ -int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize) +int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh) { - const long long BS = m_cfi.GetBufferSize(); - - Stats loc_stats; - - BlockList_t blks; - - const int idx_first = iUserOff / BS; - const int idx_last = (iUserOff + iUserSize - 1) / BS; - - BlockSet_t requested_blocks; - BlockList_t blks_to_request, blks_to_process, blks_processed; - IntList_t blks_on_disk, blks_direct; - - // lock - // loop over reqired blocks: - // - if on disk, ok; - // - if in ram or incoming, inc ref-count - // - if not available, request and inc ref count before requesting the - // hell and more (esp. for sparse readvs). - // assess if passing the req to client is actually better. - // unlock + TRACEF(Dump, "ReadV for " << readVnum << " chunks."); m_state_cond.Lock(); - if ( ! m_is_open) - { - m_state_cond.UnLock(); - TRACEF(Error, "Read file is not open"); - return io->GetInput()->Read(iUserBuff, iUserOff, iUserSize); - } if (m_in_shutdown) { m_state_cond.UnLock(); return -ENOENT; } - for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx) - { - TRACEF(Dump, "Read() idx " << block_idx); - BlockMap_i bi = m_block_map.find(block_idx); + // Shortcut -- file is fully downloaded. - // In RAM or incoming? - if (bi != m_block_map.end()) - { - inc_ref_count(bi->second); - TRACEF(Dump, "Read() " << (void*) iUserBuff << "inc_ref_count for existing block " << bi->second << " idx = " << block_idx); - blks_to_process.push_front(bi->second); - } - // On disk? - else if (m_cfi.TestBitWritten(offsetIdx(block_idx))) - { - TRACEF(Dump, "Read() read from disk " << (void*)iUserBuff << " idx = " << block_idx); - blks_on_disk.push_back(block_idx); - } - // Then we have to get it ... - else - { - // Is there room for one more RAM Block? - Block *b = PrepareBlockRequest(block_idx, io, false); - if (b) - { - TRACEF(Dump, "Read() inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx); - inc_ref_count(b); - blks_to_process.push_back(b); - blks_to_request.push_back(b); - requested_blocks.insert(b); - } - // Nope ... read this directly without caching. - else - { - TRACEF(Dump, "Read() direct block " << block_idx); - blks_direct.push_back(block_idx); - } - } + if (m_cfi.IsComplete()) + { + m_state_cond.UnLock(); + int ret = m_data_file->ReadV(const_cast(readV), readVnum); + if (ret > 0) m_stats.AddBytesHit(ret); + return ret; } - m_state_cond.UnLock(); + return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() "); +} - ProcessBlockRequests(blks_to_request); +//------------------------------------------------------------------------------ - long long bytes_read = 0; - int error_cond = 0; // to be set to -errno +int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum, + ReadReqRH *rh, const char *tpfx) +{ + // Non-trivial processing for Read and ReadV. + // Entered under lock. + // + // loop over reqired blocks: + // - if on disk, ok; + // - if in ram or incoming, inc ref-count + // - otherwise request and inc ref count (unless RAM full => request direct) + // unlock - // First, send out any direct requests. - // TODO Could send them all out in a single vector read. - DirectResponseHandler *direct_handler = 0; - int direct_size = 0; + int prefetch_cnt = 0; - if ( ! blks_direct.empty()) - { - direct_handler = new DirectResponseHandler(blks_direct.size()); + ReadRequest *read_req = nullptr; + BlockList_t blks_to_request; // blocks we are issuing a new remote request for - direct_size = RequestBlocksDirect(io, direct_handler, blks_direct, iUserBuff, iUserOff, iUserSize); + std::unordered_map> blks_ready; - TRACEF(Dump, "Read() direct read requests sent out, size = " << direct_size); - } + std::vector iovec_disk; + std::vector iovec_direct; + int iovec_disk_total = 0; + int iovec_direct_total = 0; - // Second, read blocks from disk. - if ( ! blks_on_disk.empty() && bytes_read >= 0) + for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx) { - int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize); - TRACEF(Dump, "Read() " << (void*)iUserBuff <<" from disk finished size = " << rc); - if (rc >= 0) - { - bytes_read += rc; - loc_stats.m_BytesHit += rc; - } - else - { - error_cond = rc; - TRACEF(Error, "Read() failed read from disk"); - } - } + const XrdOucIOVec &iov = readV[iov_idx]; + long long iUserOff = iov.offset; + int iUserSize = iov.size; + char *iUserBuff = iov.data; - // Third, loop over blocks that are available or incoming - int prefetchHitsRam = 0; - while ( ! blks_to_process.empty()) - { - BlockList_t finished; - BlockList_t to_reissue; + const int idx_first = iUserOff / m_block_size; + const int idx_last = (iUserOff + iUserSize - 1) / m_block_size; + + TRACEF(Dump, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last); + + enum LastBlock_e { LB_other, LB_disk, LB_direct }; + + LastBlock_e lbe = LB_other; + + for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx) { - XrdSysCondVarHelper _lck(m_state_cond); + TRACEF(Dump, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx); + BlockMap_i bi = m_block_map.find(block_idx); + + // overlap and read + long long off; // offset in user buffer + long long blk_off; // offset in block + int size; // size to copy - BlockList_i bi = blks_to_process.begin(); - while (bi != blks_to_process.end()) + overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size); + + // In RAM or incoming? + if (bi != m_block_map.end()) { - if ((*bi)->is_failed() && (*bi)->get_io() != io) - { - TRACEF(Info, "Read() requested block " << (void*)(*bi) << " failed with another io " << - (*bi)->get_io() << " - reissuing request with my io " << io); + inc_ref_count(bi->second); + TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx); - (*bi)->reset_error_and_set_io(io); - to_reissue.push_back(*bi); - ++bi; - } - else if ((*bi)->is_finished()) + if (bi->second->is_finished()) { - TRACEF(Dump, "Read() requested block finished " << (void*)(*bi) << ", is_failed()=" << (*bi)->is_failed()); - finished.push_back(*bi); - BlockList_i bj = bi++; - blks_to_process.erase(bj); + // note, blocks with error should not be here !!! + // they should be either removed or reissued in ProcessBlockResponse() + assert(bi->second->is_ok()); + + blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) ); + + if (bi->second->m_prefetch) + ++prefetch_cnt; } else { - ++bi; - } - } + if ( ! read_req) + read_req = new ReadRequest(io, rh); - if (finished.empty() && to_reissue.empty()) - { - m_state_cond.Wait(); - continue; - } - } + // We have a lock on state_cond --> as we register the request before releasing the lock, + // we are sure to get a call-in via the ChunkRequest handling when this block arrives. - ProcessBlockRequests(to_reissue); - to_reissue.clear(); + bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) ); + ++read_req->m_n_chunk_reqs; + } - BlockList_i bi = finished.begin(); - while (bi != finished.end()) - { - if ((*bi)->is_ok()) + lbe = LB_other; + } + // On disk? + else if (m_cfi.TestBitWritten(offsetIdx(block_idx))) { - long long user_off; // offset in user buffer - long long off_in_block; // offset in block - long long size_to_copy; // size to copy - - overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy); + TRACEF(Dump, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx); - TRACEF(Dump, "Read() ub=" << (void*)iUserBuff << " from finished block " << (*bi)->m_offset/BS << " size " << size_to_copy); - memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy); - bytes_read += size_to_copy; - - if (requested_blocks.find(*bi) == requested_blocks.end()) - loc_stats.m_BytesHit += size_to_copy; + if (lbe == LB_disk) + iovec_disk.back().size += size; else - loc_stats.m_BytesMissed += size_to_copy; + iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } ); + iovec_disk_total += size; + + if (m_cfi.TestBitPrefetch(offsetIdx(block_idx))) + ++prefetch_cnt; - if ((*bi)->m_prefetch) - prefetchHitsRam++; + lbe = LB_disk; } + // Neither ... then we have to go get it ... else { - // It has failed ... report only the first error. - if ( ! error_cond) + if ( ! read_req) + read_req = new ReadRequest(io, rh); + + // Is there room for one more RAM Block? + Block *b = PrepareBlockRequest(block_idx, io, read_req, false); + if (b) { - error_cond = (*bi)->m_errno; - TRACEF(Error, "Read() io " << io << ", block "<< (*bi)->m_offset/BS << - " finished with error " << -error_cond << " " << XrdSysE2T(-error_cond)); + TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx); + inc_ref_count(b); + blks_to_request.push_back(b); + + b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size)); + ++read_req->m_n_chunk_reqs; + + lbe = LB_other; + } + else // Nope ... read this directly without caching. + { + TRACEF(Dump, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size); + + if (lbe == LB_direct) + iovec_direct.back().size += size; + else + iovec_direct.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } ); + iovec_direct_total += size; + read_req->m_direct_done = false; + + lbe = LB_direct; } } - ++bi; - } + } // end for over blocks in an IOVec + } // end for over readV IOVec + + inc_prefetch_hit_cnt(prefetch_cnt); - std::copy(finished.begin(), finished.end(), std::back_inserter(blks_processed)); - finished.clear(); + m_state_cond.UnLock(); + + // First, send out remote requests for new blocks. + if ( ! blks_to_request.empty()) + { + ProcessBlockRequests(blks_to_request); + blks_to_request.clear(); } - // Fourth, make sure all direct requests have arrived. - // This can not be skipped as responses write into request memory buffers. - if (direct_handler != 0) + // Second, send out remote direct read requests. + if ( ! iovec_direct.empty()) { - TRACEF(Dump, "Read() waiting for direct requests "); + DirectResponseHandler *direct_handler = new DirectResponseHandler(this, read_req, 1); + RequestBlocksDirect(io, direct_handler, iovec_direct, iovec_direct_total); - XrdSysCondVarHelper _lck(direct_handler->m_cond); + TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total); + } - while (direct_handler->m_to_wait > 0) + // Begin synchronous part where we process data that is already in RAM or on disk. + + long long bytes_read = 0; + int error_cond = 0; // to be set to -errno + + // Third, process blocks that are available in RAM. + if ( ! blks_ready.empty()) + { + for (auto &bvi : blks_ready) { - direct_handler->m_cond.Wait(); + for (auto &cr : bvi.second) + { + TRACEF(Dump, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size); + memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size); + bytes_read += cr.m_size; + } } + } - if (direct_handler->m_errno == 0) + // Fourth, read blocks from disk. + if ( ! iovec_disk.empty()) + { + int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total); + TRACEF(Dump, tpfx << "from disk finished size = " << rc); + if (rc >= 0) { - bytes_read += direct_size; - loc_stats.m_BytesBypassed += direct_size; + bytes_read += rc; } else { - // Set error and report only if this is the first error in this read. - if ( ! error_cond) - { - error_cond = direct_handler->m_errno; - TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond)); - } + error_cond = rc; + TRACEF(Error, tpfx << "failed read from disk"); } - - delete direct_handler; } - assert(iUserSize >= bytes_read); - // Last, stamp and release blocks, release file. - { - XrdSysCondVarHelper _lck(m_state_cond); + // End synchronous part -- update with sync stats and determine actual state of this read. + // Note: remote reads might have already finished during disk-read! + + m_state_cond.Lock(); - // blks_to_process can be non-empty, if we're exiting with an error. - std::copy(blks_to_process.begin(), blks_to_process.end(), std::back_inserter(blks_processed)); + for (auto &bvi : blks_ready) + dec_ref_count(bvi.first, (int) bvi.second.size()); - for (BlockList_i bi = blks_processed.begin(); bi != blks_processed.end(); ++bi) + if (read_req) + { + read_req->m_bytes_read += bytes_read; + read_req->update_error_cond(error_cond); + read_req->m_stats.m_BytesHit += bytes_read; + read_req->m_sync_done = true; + + if (read_req->is_complete()) { - TRACEF(Dump, "Read() dec_ref_count " << (void*)(*bi) << " idx = " << (int)((*bi)->m_offset/BufferSize())); - dec_ref_count(*bi); - } + // Almost like FinalizeReadRequest(read_req) -- but no callout! + + m_state_cond.UnLock(); - // update prefetch score - m_prefetch_hit_cnt += prefetchHitsRam; - for (IntList_i d = blks_on_disk.begin(); d != blks_on_disk.end(); ++d) + m_stats.AddReadStats(read_req->m_stats); + + int ret = read_req->return_value(); + delete read_req; + return ret; + } + else { - if (m_cfi.TestBitPrefetch(offsetIdx(*d))) - m_prefetch_hit_cnt++; + m_state_cond.UnLock(); + return -EWOULDBLOCK; } - m_prefetch_score = float(m_prefetch_hit_cnt)/m_prefetch_read_cnt; } + else + { + m_stats.m_BytesHit += bytes_read; + + m_state_cond.UnLock(); - m_stats.AddReadStats(loc_stats); + // !!! No callout. - return error_cond ? error_cond : bytes_read; + return error_cond ? error_cond : bytes_read; + } } -//------------------------------------------------------------------------------ + +//============================================================================== +// WriteBlock and Sync +//============================================================================== void File::WriteBlockToDisk(Block* b) { @@ -969,7 +956,7 @@ void File::WriteBlockToDisk(Block* b) return; } - const int blk_idx = (b->m_offset - m_offset) / m_cfi.GetBufferSize(); + const int blk_idx = (b->m_offset - m_offset) / m_block_size; // Set written bit. TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size); @@ -1001,7 +988,7 @@ void File::WriteBlockToDisk(Block* b) { m_cfi.SetBitSynced(blk_idx); ++m_non_flushed_cnt; - if (m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt && + if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) && ! m_in_shutdown) { schedule_sync = true; @@ -1029,7 +1016,7 @@ void File::Sync() { Stats loc_stats = m_stats.Clone(); m_cfi.WriteIOStat(loc_stats); - m_cfi.Write(m_info_file,m_filename.c_str()); + m_cfi.Write(m_info_file, m_filename.c_str()); int cret = m_info_file->Fsync(); if (cret != XrdOssOK) { @@ -1058,7 +1045,8 @@ void File::Sync() return; } - int written_while_in_sync; + int written_while_in_sync; + bool resync = false; { XrdSysCondVarHelper _lck(&m_state_cond); for (std::vector::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i) @@ -1067,39 +1055,29 @@ void File::Sync() } written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size(); m_writes_during_sync.clear(); - m_in_sync = false; - } - TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync"); -} -//------------------------------------------------------------------------------ + // If there were writes during sync and the file is now complete, + // let us call Sync again without resetting the m_in_sync flag. + if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown) + resync = true; + else + m_in_sync = false; + } + TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : "")); -void File::inc_ref_count(Block* b) -{ - // Method always called under lock. - b->m_refcnt++; - TRACEF(Dump, "inc_ref_count " << b << " refcnt " << b->m_refcnt); + if (resync) + Sync(); } -//------------------------------------------------------------------------------ - -void File::dec_ref_count(Block* b) -{ - // Method always called under lock. - assert(b->is_finished()); - b->m_refcnt--; - assert(b->m_refcnt >= 0); - if (b->m_refcnt == 0) - { - free_block(b); - } -} +//============================================================================== +// Block processing +//============================================================================== void File::free_block(Block* b) { // Method always called under lock. - int i = b->m_offset / BufferSize(); + int i = b->m_offset / m_block_size; TRACEF(Dump, "free_block block " << b << " idx = " << i); size_t ret = m_block_map.erase(i); if (ret != 1) @@ -1168,13 +1146,97 @@ bool File::select_current_io_or_disable_prefetching(bool skip_current) //------------------------------------------------------------------------------ -void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) +void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond) { - static const char* tpfx = "ProcessBlockResponse "; + // Called from DirectResponseHandler. + // NOT under lock. + + TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond)); + + m_state_cond.Lock(); + + if (error_cond) + rreq->update_error_cond(error_cond); + else + rreq->m_stats.m_BytesBypassed += bytes_read; + + rreq->m_direct_done = true; + + bool rreq_complete = rreq->is_complete(); + + m_state_cond.UnLock(); + + if (rreq_complete) + FinalizeReadRequest(rreq); +} + +void File::ProcessBlockError(Block *b, ReadRequest *rreq) +{ + // Called from ProcessBlockResponse(). + // YES under lock -- we have to protect m_block_map for recovery through multiple IOs. + // Does not manage m_read_req. + // Will not complete the request. + + TRACEF(Error, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size << + " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error())); + + rreq->update_error_cond(b->get_error()); + --rreq->m_n_chunk_reqs; + + dec_ref_count(b); +} + +void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq) +{ + // Called from ProcessBlockResponse(). + // NOT under lock as it does memcopy ofor exisf block data. + // Acquires lock for block, m_read_req and rreq state update. + + ReadRequest *rreq = creq.m_read_req; + + TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size); + memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size); + + m_state_cond.Lock(); + + rreq->m_bytes_read += creq.m_size; + + if (b->get_req_id() == (void*) rreq) + rreq->m_stats.m_BytesMissed += creq.m_size; + else + rreq->m_stats.m_BytesHit += creq.m_size; + + --rreq->m_n_chunk_reqs; + + dec_ref_count(b); + + if (b->m_prefetch) + inc_prefetch_hit_cnt(1); + + bool rreq_complete = rreq->is_complete(); - Block *b = brh->m_block; + m_state_cond.UnLock(); + + if (rreq_complete) + FinalizeReadRequest(rreq); +} + +void File::FinalizeReadRequest(ReadRequest *rreq) +{ + // called from ProcessBlockResponse() + // NOT under lock -- does callout + + m_stats.AddReadStats(rreq->m_stats); + + rreq->m_rh->Done(rreq->return_value()); + delete rreq; +} - TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/BufferSize() << ", off=" << b->m_offset << ", res=" << res); +void File::ProcessBlockResponse(Block *b, int res) +{ + static const char* tpfx = "ProcessBlockResponse "; + + TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res); if (res >= 0 && res != b->get_size()) { @@ -1184,7 +1246,7 @@ void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) Cache::GetInstance().UnlinkFile(m_filename, false); } - XrdSysCondVarHelper _lck(m_state_cond); + m_state_cond.Lock(); // Deregister block from IO's prefetch count, if needed. if (b->m_prefetch) @@ -1226,20 +1288,31 @@ void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) { b->set_downloaded(); // Increase ref-count for the writer. - TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/BufferSize()); + TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size); if ( ! m_in_shutdown) { inc_ref_count(b); m_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors()); cache()->AddWriteTask(b, true); } + + // Swap chunk-reqs vector out of Block, it will be processed outside of lock. + vChunkRequest_t creqs_to_notify; + creqs_to_notify.swap( b->m_chunk_reqs ); + + m_state_cond.UnLock(); + + for (auto &creq : creqs_to_notify) + { + ProcessBlockSuccess(b, creq); + } } else { if (res < 0) { - TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/BufferSize() << ", off=" << b->m_offset << " error=" << res); + TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " error=" << res); } else { - TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/BufferSize() << ", off=" << b->m_offset << " incomplete, got " << res << " expected " << b->get_size()); + TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " incomplete, got " << res << " expected " << b->get_size()); #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__) res = -EIO; #else @@ -1247,14 +1320,50 @@ void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) #endif } b->set_error(res); - } - m_state_cond.Broadcast(); -} + // Loop over Block's chunk-reqs vector, error out ones with the same IO. + // Collect others with a different IO, the first of them will be used to reissue the request. + // This is then done outside of lock. + std::list rreqs_to_complete; + vChunkRequest_t creqs_to_keep; -long long File::BufferSize() -{ - return m_cfi.GetBufferSize(); + for(ChunkRequest &creq : b->m_chunk_reqs) + { + ReadRequest *rreq = creq.m_read_req; + + if (rreq->m_io == b->get_io()) + { + ProcessBlockError(b, rreq); + if (rreq->is_complete()) + { + rreqs_to_complete.push_back(rreq); + } + } + else + { + creqs_to_keep.push_back(creq); + } + } + + if ( ! creqs_to_keep.empty()) + { + ReadRequest *rreq = creqs_to_keep.front().m_read_req; + + TRACEF(Info, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " << + b->get_io() << " - reissuing request with my io " << rreq->m_io); + + b->reset_error_and_set_io(rreq->m_io, rreq); + b->m_chunk_reqs.swap( creqs_to_keep ); + } + + m_state_cond.UnLock(); + + if ( ! b->m_chunk_reqs.empty()) + ProcessBlockRequest(b); + + for (auto rreq : rreqs_to_complete) + FinalizeReadRequest(rreq); + } } //------------------------------------------------------------------------------ @@ -1266,9 +1375,9 @@ const char* File::lPath() const //------------------------------------------------------------------------------ -int File::offsetIdx(int iIdx) +int File::offsetIdx(int iIdx) const { - return iIdx - m_offset/m_cfi.GetBufferSize(); + return iIdx - m_offset/m_block_size; } @@ -1298,23 +1407,23 @@ void File::Prefetch() } // Select block(s) to fetch. - for (int f = 0; f < m_cfi.GetNBlocks(); ++f) + for (int f = 0; f < m_num_blocks; ++f) { if ( ! m_cfi.TestBitWritten(f)) { - int f_act = f + m_offset / m_cfi.GetBufferSize(); + int f_act = f + m_offset / m_block_size; BlockMap_i bi = m_block_map.find(f_act); if (bi == m_block_map.end()) { - Block *b = PrepareBlockRequest(f_act, m_current_io->first, true); + Block *b = PrepareBlockRequest(f_act, m_current_io->first, nullptr, true); if (b) { TRACEF(Dump, "Prefetch take block " << f_act); blks.push_back(b); // Note: block ref_cnt not increased, it will be when placed into write queue. - m_prefetch_read_cnt++; - m_prefetch_score = float(m_prefetch_hit_cnt)/m_prefetch_read_cnt; + + inc_prefetch_read_cnt(1); } else { @@ -1405,8 +1514,7 @@ std::string File::GetRemoteLocations() const void BlockResponseHandler::Done(int res) { - m_block->m_file->ProcessBlockResponse(this, res); - + m_block->m_file->ProcessBlockResponse(m_block, res); delete this; } @@ -1414,17 +1522,21 @@ void BlockResponseHandler::Done(int res) void DirectResponseHandler::Done(int res) { - XrdSysCondVarHelper _lck(m_cond); + m_mutex.Lock(); - --m_to_wait; + int n_left = --m_to_wait; - if (res < 0) - { - m_errno = res; + if (res < 0) { + if (m_errno == 0) m_errno = res; // store first reported error + } else { + m_bytes_read += res; } - if (m_to_wait == 0) + m_mutex.UnLock(); + + if (n_left == 0) { - m_cond.Signal(); + m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno); + delete this; } } diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index 518dda5db00..2081f53b637 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -27,9 +27,10 @@ #include "XrdPfcInfo.hh" #include "XrdPfcStats.hh" -#include +#include #include #include +#include class XrdJob; class XrdOucIOVec; @@ -54,14 +55,70 @@ struct ReadVChunkListDisk; namespace XrdPfc { +class File; + +struct ReadReqRH : public XrdOucCacheIOCB +{ + int m_expected_size = 0; + int m_n_chunks = 0; // Only set for ReadV(). + unsigned short m_seq_id; + XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read(). + + ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) : + m_seq_id(sid), m_iocb(iocb) + {} +}; + +// ------------------------------------------------------------- + +struct ReadRequest +{ + IO *m_io; + ReadReqRH *m_rh; // Internal callback created in IO::Read(). + + long long m_bytes_read = 0; + int m_error_cond = 0; // to be set to -errno + Stats m_stats; -class File; + int m_n_chunk_reqs = 0; + bool m_sync_done = false; + bool m_direct_done = true; + + ReadRequest(IO *io, ReadReqRH *rh) : + m_io(io), m_rh(rh) + {} + + void update_error_cond(int ec) { if (m_error_cond == 0 ) m_error_cond = ec; } + + bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; } + int return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; } +}; + +// ------------------------------------------------------------- + +struct ChunkRequest +{ + ReadRequest *m_read_req; + char *m_buf; // Where to place the data chunk. + long long m_off; // Offset *within* the corresponding block. + int m_size; // Size of the data chunk. + + ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) : + m_read_req(rreq), m_buf(buf), m_off(off), m_size(size) + {} +}; + +using vChunkRequest_t = std::vector; +using vChunkRequest_i = std::vector::iterator; + +// ================================================================ class Block { public: File *m_file; IO *m_io; // IO that handled current request, used for == / != comparisons only + void *m_req_id; // Identity of requestor -- used for stats. char *m_buff; long long m_offset; @@ -75,30 +132,38 @@ public: vCkSum_t m_cksum_vec; int m_n_cksum_errors; - Block(File *f, IO *io, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net) : - m_file(f), m_io(io), m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize), + vChunkRequest_t m_chunk_reqs; + + Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, + bool m_prefetch, bool cks_net) : + m_file(f), m_io(io), m_req_id(rid), + m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize), m_refcnt(0), m_errno(0), m_downloaded(false), m_prefetch(m_prefetch), m_req_cksum_net(cks_net), m_n_cksum_errors(0) {} - char* get_buff() { return m_buff; } - int get_size() { return m_size; } - int get_req_size() { return m_req_size; } - long long get_offset() { return m_offset; } + char* get_buff() const { return m_buff; } + int get_size() const { return m_size; } + int get_req_size() const { return m_req_size; } + long long get_offset() const { return m_offset; } - IO* get_io() const { return m_io; } + File* get_file() const { return m_file; } + IO* get_io() const { return m_io; } + void* get_req_id() const { return m_req_id; } - bool is_finished() { return m_downloaded || m_errno != 0; } - bool is_ok() { return m_downloaded; } - bool is_failed() { return m_errno != 0; } + bool is_finished() const { return m_downloaded || m_errno != 0; } + bool is_ok() const { return m_downloaded; } + bool is_failed() const { return m_errno != 0; } void set_downloaded() { m_downloaded = true; } void set_error(int err) { m_errno = err; } + int get_error() const { return m_errno; } - void reset_error_and_set_io(IO *io) + void reset_error_and_set_io(IO *io, void *rid) { - m_errno = 0; - m_io = io; + m_errno = 0; + m_io = io; + m_req_id = rid; } bool req_cksum_net() const { return m_req_cksum_net; } @@ -106,9 +171,11 @@ public: vCkSum_t& ref_cksum_vec() { return m_cksum_vec; } int get_n_cksum_errors() { return m_n_cksum_errors; } int* ptr_n_cksum_errors() { return &m_n_cksum_errors; } - }; +using BlockList_t = std::list; +using BlockList_i = std::list::iterator; + // ================================================================ class BlockResponseHandler : public XrdOucCacheIOCB @@ -118,45 +185,41 @@ public: BlockResponseHandler(Block *b) : m_block(b) {} - virtual void Done(int result); + void Done(int result) override; }; -// ================================================================ +// ---------------------------------------------------------------- class DirectResponseHandler : public XrdOucCacheIOCB { public: - XrdSysCondVar m_cond; - int m_to_wait; - int m_errno; - - DirectResponseHandler(int to_wait) : m_cond(0), m_to_wait(to_wait), m_errno(0) {} - - bool is_finished() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0; } - bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; } - bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; } + XrdSysMutex m_mutex; + File *m_file; + ReadRequest *m_read_req; + int m_to_wait; + int m_bytes_read = 0; + int m_errno = 0; + + DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) : + m_file(file), m_read_req(rreq), m_to_wait(to_wait) + {} - virtual void Done(int result); + void Done(int result) override; }; // ================================================================ class File { + friend class BlockResponseHandler; + friend class DirectResponseHandler; public: - //------------------------------------------------------------------------ - //! Constructor. - //------------------------------------------------------------------------ - File(const std::string &path, long long offset, long long fileSize); + // Constructor and Open() are private. - //------------------------------------------------------------------------ //! Static constructor that also does Open. Returns null ptr if Open fails. - //------------------------------------------------------------------------ static File* FileOpen(const std::string &path, long long offset, long long fileSize); - //------------------------------------------------------------------------ //! Destructor. - //------------------------------------------------------------------------ ~File(); //! Handle removal of a block from Cache's write queue. @@ -165,19 +228,11 @@ public: //! Handle removal of a set of blocks from Cache's write queue. void BlocksRemovedFromWriteQ(std::list&); - //! Open file handle for data file and info file on local disk. - bool Open(); - - //! Vector read from disk if block is already downloaded, else ReadV from client. - int ReadV(IO *io, const XrdOucIOVec *readV, int n); - //! Normal read. - int Read (IO *io, char* buff, long long offset, int size); + int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh); - //---------------------------------------------------------------------- - //! \brief Data and cinfo files are open. - //---------------------------------------------------------------------- - bool isOpen() const { return m_is_open; } + //! Vector read. + int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh); //---------------------------------------------------------------------- //! \brief Notification from IO that it has been updated (remote open). @@ -207,8 +262,6 @@ public: //---------------------------------------------------------------------- void Sync(); - - void ProcessBlockResponse(BlockResponseHandler* brh, int res); void WriteBlockToDisk(Block* b); void Prefetch(); @@ -249,20 +302,23 @@ public: bool is_in_emergency_shutdown() { return m_in_shutdown; } private: - enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete }; + //! Constructor. + File(const std::string &path, long long offset, long long fileSize); + + //! Open file handle for data file and info file on local disk. + bool Open(); + + static const char *m_traceID; int m_ref_cnt; //!< number of references from IO or sync - bool m_is_open; //!< open state (presumably not needed anymore) - bool m_in_shutdown; //!< file is in emergency shutdown due to irrecoverable error or unlink request - - XrdOssDF *m_data_file; //!< file handle for data file on disk - XrdOssDF *m_info_file; //!< file handle for data-info file on disk + XrdOssDF *m_data_file; //!< file handle for data file on disk + XrdOssDF *m_info_file; //!< file handle for data-info file on disk Info m_cfi; //!< download status of file blocks and access statistics std::string m_filename; //!< filename of data file on disk long long m_offset; //!< offset of cached file for block-based / hdfs operation - long long m_file_size; //!< size of cached disk file for block-based operation + long long m_file_size; //!< size of cached disk file for block-based operation // IO objects attached to this file. @@ -286,27 +342,28 @@ private: IoMap_i m_current_io; //!< IO object to be used for prefetching. int m_ios_in_detach; //!< Number of IO objects to which we replied false to ioActive() and will be removed soon. - // fsync + // FSync + std::vector m_writes_during_sync; int m_non_flushed_cnt; bool m_in_sync; + bool m_detach_time_logged; + bool m_in_shutdown; //!< file is in emergency shutdown due to irrecoverable error or unlink request + + // Block state and management typedef std::list IntList_t; typedef IntList_t::iterator IntList_i; - typedef std::list BlockList_t; - typedef BlockList_t::iterator BlockList_i; - typedef std::map BlockMap_t; typedef BlockMap_t::iterator BlockMap_i; - typedef std::set BlockSet_t; - typedef BlockSet_t::iterator BlockSet_i; - - - BlockMap_t m_block_map; - + BlockMap_t m_block_map; XrdSysCondVar m_state_cond; + long long m_block_size; + int m_num_blocks; + + // Stats Stats m_stats; //!< cache statistics for this instance Stats m_last_stats; //!< copy of cache stats during last purge cycle, used for per directory stat reporting @@ -314,15 +371,21 @@ private: std::set m_remote_locations; //!< Gathered in AddIO / ioUpdate / ioActive. void insert_remote_location(const std::string &loc); + // Prefetch + + enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete }; + PrefetchState_e m_prefetch_state; int m_prefetch_read_cnt; int m_prefetch_hit_cnt; float m_prefetch_score; // cached - - bool m_detach_time_logged; - static const char *m_traceID; + void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } } + void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } } + void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; } + + // Helpers bool overlap(int blk, // block to query long long blk_size, // @@ -331,46 +394,63 @@ private: // output: long long &off, // offset in user buffer long long &blk_off, // offset in block - long long &size); + int &size); + + // Read & ReadV + + Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch); - // Read - Block* PrepareBlockRequest(int i, IO *io, bool prefetch); - void ProcessBlockRequest (Block *b); void ProcessBlockRequests(BlockList_t& blks); - int RequestBlocksDirect(IO *io, DirectResponseHandler *handler, IntList_t& blocks, - char* buff, long long req_off, long long req_size); - - int ReadBlocksFromDisk(IntList_t& blocks, - char* req_buf, long long req_off, long long req_size); - - // VRead - bool VReadValidate (const XrdOucIOVec *readV, int n); - void VReadPreProcess (IO *io, const XrdOucIOVec *readV, int n, - BlockList_t& blks_to_request, - ReadVBlockListRAM& blks_to_process, - ReadVBlockListDisk& blks_on_disk, - std::vector& chunkVec); - int VReadFromDisk (const XrdOucIOVec *readV, int n, - ReadVBlockListDisk& blks_on_disk); - int VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n, - std::vector& blks_to_process, - std::vector& blks_processed, - long long& bytes_hit, - long long& bytes_missed); - - long long BufferSize(); - - void inc_ref_count(Block*); - void dec_ref_count(Block*); + void RequestBlocksDirect(IO *io, DirectResponseHandler *handler, std::vector& ioVec, int expected_size); + + int ReadBlocksFromDisk(std::vector& ioVec, int expected_size); + + int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum, + ReadReqRH *rh, const char *tpfx); + + void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond); + void ProcessBlockError(Block *b, ReadRequest *rreq); + void ProcessBlockSuccess(Block *b, ChunkRequest &creq); + void FinalizeReadRequest(ReadRequest *rreq); + + void ProcessBlockResponse(Block *b, int res); + + // Block management + + void inc_ref_count(Block* b); + void dec_ref_count(Block* b, int count = 1); void free_block(Block*); bool select_current_io_or_disable_prefetching(bool skip_current); - int offsetIdx(int idx); + int offsetIdx(int idx) const; }; +//------------------------------------------------------------------------------ + +inline void File::inc_ref_count(Block* b) +{ + // Method always called under lock. + b->m_refcnt++; +} + +//------------------------------------------------------------------------------ + +inline void File::dec_ref_count(Block* b, int count) +{ + // Method always called under lock. + assert(b->is_finished()); + b->m_refcnt -= count; + assert(b->m_refcnt >= 0); + + if (b->m_refcnt == 0) + { + free_block(b); + } +} + } #endif diff --git a/src/XrdPfc/XrdPfcIO.cc b/src/XrdPfc/XrdPfcIO.cc index f664d6f6b7f..eec8eae2e88 100644 --- a/src/XrdPfc/XrdPfcIO.cc +++ b/src/XrdPfc/XrdPfcIO.cc @@ -6,7 +6,8 @@ using namespace XrdPfc; IO::IO(XrdOucCacheIO *io, Cache &cache) : m_cache (cache), m_traceID ("IO"), - m_io (io) + m_io (io), + m_read_seqid (0u) {} //============================================================================== diff --git a/src/XrdPfc/XrdPfcIO.hh b/src/XrdPfc/XrdPfcIO.hh index 7e971254a43..10490f30514 100644 --- a/src/XrdPfc/XrdPfcIO.hh +++ b/src/XrdPfc/XrdPfcIO.hh @@ -13,7 +13,7 @@ class XrdSysTrace; namespace XrdPfc { //---------------------------------------------------------------------------- -//! Base cache-io class that implements XrdOucCacheIO abstract methods. +//! Base cache-io class that implements some XrdOucCacheIO abstract methods. //---------------------------------------------------------------------------- class IO : public XrdOucCacheIO { @@ -24,25 +24,22 @@ public: virtual XrdOucCacheIO *Base() { return m_io; } //! Original data source URL. - virtual const char *Path() { return m_io.load(std::memory_order_relaxed)->Path(); } + const char *Path() override { return m_io.load(std::memory_order_relaxed)->Path(); } using XrdOucCacheIO::Sync; - - virtual int Sync() { return 0; } + int Sync() override { return 0; } using XrdOucCacheIO::Trunc; - - virtual int Trunc(long long Offset) { return -ENOTSUP; } + int Trunc(long long Offset) override { return -ENOTSUP; } using XrdOucCacheIO::Write; + int Write(char *Buffer, long long Offset, int Length) override { return -ENOTSUP; } - virtual int Write(char *Buffer, long long Offset, int Length) { return -ENOTSUP; } - - virtual void Update(XrdOucCacheIO &iocp); + void Update(XrdOucCacheIO &iocp) override; // Detach is virtual from XrdOucCacheIO, here it is split // into abstract ioActive() and DetachFinalize(). - bool Detach(XrdOucCacheIOCD &iocdP) /* final */; + bool Detach(XrdOucCacheIOCD &iocdP) final; virtual bool ioActive() = 0; virtual void DetachFinalize() = 0; @@ -53,17 +50,31 @@ public: XrdOucCacheIO* GetInput(); protected: - Cache &m_cache; //!< reference to Cache needed in detach + Cache &m_cache; //!< reference to Cache object const char *m_traceID; const char* GetPath() { return m_io.load(std::memory_order_relaxed)->Path(); } std::string GetFilename() { return XrdCl::URL(GetPath()).GetPath(); } const char* RefreshLocation() { return m_io.load(std::memory_order_relaxed)->Location(true); } + unsigned short ObtainReadSid() { return m_read_seqid++; } + + struct ReadReqRHCond : public ReadReqRH + { + XrdSysCondVar m_cond {0}; + int m_retval {0}; + + using ReadReqRH::ReadReqRH; + + void Done(int result) override + { m_cond.Lock(); m_retval = result; m_cond.Signal(); m_cond.UnLock(); } + }; + private: - std::atomic m_io; //!< original data source + std::atomic m_io; //!< original data source + std::atomic m_read_seqid; //!< sequential read id (for logging) - void SetInput(XrdOucCacheIO*); + void SetInput(XrdOucCacheIO*); }; } diff --git a/src/XrdPfc/XrdPfcIOEntireFile.cc b/src/XrdPfc/XrdPfcIOEntireFile.cc deleted file mode 100644 index 7359257b59d..00000000000 --- a/src/XrdPfc/XrdPfcIOEntireFile.cc +++ /dev/null @@ -1,201 +0,0 @@ -//---------------------------------------------------------------------------------- -// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University -// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman -//---------------------------------------------------------------------------------- -// XRootD is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// XRootD is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with XRootD. If not, see . -//---------------------------------------------------------------------------------- - -#include -#include - -#include "XrdSys/XrdSysError.hh" -#include "XrdSfs/XrdSfsInterface.hh" -#include "XrdSys/XrdSysPthread.hh" - -#include "XrdPfcIOEntireFile.hh" -#include "XrdPfcStats.hh" -#include "XrdPfcTrace.hh" - -#include "XrdOuc/XrdOucEnv.hh" - -using namespace XrdPfc; - -//______________________________________________________________________________ -IOEntireFile::IOEntireFile(XrdOucCacheIO *io, Cache & cache) : - IO(io, cache), - m_file(0), - m_localStat(0) -{ - m_file = Cache::GetInstance().GetFile(GetFilename(), this); -} - -//______________________________________________________________________________ -IOEntireFile::~IOEntireFile() -{ - // called from Detach() if no sync is needed or - // from Cache's sync thread - TRACEIO(Debug, "~IOEntireFile() " << this); - - delete m_localStat; -} - -//______________________________________________________________________________ -int IOEntireFile::Fstat(struct stat &sbuff) -{ - std::string name = GetFilename() + Info::s_infoExtension; - - int res = 0; - if( ! m_localStat) - { - res = initCachedStat(name.c_str()); - if (res) return res; - } - - memcpy(&sbuff, m_localStat, sizeof(struct stat)); - return 0; -} - -//______________________________________________________________________________ -long long IOEntireFile::FSize() -{ - return m_file->GetFileSize(); -} - -//______________________________________________________________________________ -int IOEntireFile::initCachedStat(const char* path) -{ - // Called indirectly from the constructor. - - static const char* trace_pfx = "initCachedStat "; - - int res = -1; - struct stat tmpStat; - - if (m_cache.GetOss()->Stat(path, &tmpStat) == XrdOssOK) - { - XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); - XrdOucEnv myEnv; - int res_open; - if ((res_open = infoFile->Open(path, O_RDONLY, 0600, myEnv)) == XrdOssOK) - { - Info info(m_cache.GetTrace()); - if (info.Read(infoFile, path)) - { - tmpStat.st_size = info.GetFileSize(); - TRACEIO(Info, trace_pfx << "successfully read size from info file = " << tmpStat.st_size); - res = 0; - } - else - { - // file exist but can't read it - TRACEIO(Info, trace_pfx << "info file is incomplete or corrupt"); - } - } - else - { - TRACEIO(Error, trace_pfx << "can't open info file " << XrdSysE2T(-res_open)); - } - infoFile->Close(); - delete infoFile; - } - - if (res) - { - res = GetInput()->Fstat(tmpStat); - TRACEIO(Debug, trace_pfx << "got stat from client res = " << res << ", size = " << tmpStat.st_size); - } - - if (res == 0) - { - m_localStat = new struct stat; - memcpy(m_localStat, &tmpStat, sizeof(struct stat)); - } - return res; -} - -//______________________________________________________________________________ -void IOEntireFile::Update(XrdOucCacheIO &iocp) -{ - IO::Update(iocp); - m_file->ioUpdated(this); -} - -//______________________________________________________________________________ -bool IOEntireFile::ioActive() -{ - RefreshLocation(); - return m_file->ioActive(this); -} - -//______________________________________________________________________________ -void IOEntireFile::DetachFinalize() -{ - // Effectively a destructor. - - TRACE(Info, "DetachFinalize() " << this); - - m_file->RequestSyncOfDetachStats(); - Cache::GetInstance().ReleaseFile(m_file, this); - - delete this; -} - -//______________________________________________________________________________ -int IOEntireFile::Read(char *buff, long long off, int size) -{ - TRACEIO(Dump, "Read() "<< this << " off: " << off << " size: " << size); - - // protect from reads over the file size - if (off >= FSize()) - return 0; - if (off < 0) - { - return -EINVAL; - } - if (off + size > FSize()) - size = FSize() - off; - - - ssize_t bytes_read = 0; - ssize_t retval = 0; - - retval = m_file->Read(this, buff, off, size); - if (retval >= 0) - { - bytes_read = retval; - size -= retval; - - // All errors like this should have been already captured by File::Read() - // and reflected in its retval. - if (size > 0) - TRACEIO(Warning, "Read() bytes missed " << size); - } - else - { - TRACEIO(Warning, "Read() error in File::Read(), exit status=" << retval - << ", error=" << XrdSysE2T(-retval)); - } - - return (retval < 0) ? retval : bytes_read; -} - - -/* - * Perform a readv from the cache - */ -int IOEntireFile::ReadV(const XrdOucIOVec *readV, int n) -{ - TRACEIO(Dump, "ReadV(), get " << n << " requests" ); - return m_file->ReadV(this, readV, n); -} diff --git a/src/XrdPfc/XrdPfcIOFile.cc b/src/XrdPfc/XrdPfcIOFile.cc new file mode 100644 index 00000000000..2f1bedd6862 --- /dev/null +++ b/src/XrdPfc/XrdPfcIOFile.cc @@ -0,0 +1,349 @@ +//---------------------------------------------------------------------------------- +// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University +// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman +//---------------------------------------------------------------------------------- +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +//---------------------------------------------------------------------------------- + +#include +#include + +#include "XrdSys/XrdSysError.hh" +#include "XrdSfs/XrdSfsInterface.hh" +#include "XrdSys/XrdSysPthread.hh" + +#include "XrdPfcIOFile.hh" +#include "XrdPfcStats.hh" +#include "XrdPfcTrace.hh" + +#include "XrdOuc/XrdOucEnv.hh" +#include "XrdOuc/XrdOucPgrwUtils.hh" + +using namespace XrdPfc; + +//______________________________________________________________________________ +IOFile::IOFile(XrdOucCacheIO *io, Cache & cache) : + IO(io, cache), + m_file(0), + m_localStat(0) +{ + m_file = Cache::GetInstance().GetFile(GetFilename(), this); +} + +//______________________________________________________________________________ +IOFile::~IOFile() +{ + // called from Detach() if no sync is needed or + // from Cache's sync thread + TRACEIO(Debug, "~IOFile() " << this); + + delete m_localStat; +} + +//______________________________________________________________________________ +int IOFile::Fstat(struct stat &sbuff) +{ + std::string name = GetFilename() + Info::s_infoExtension; + + int res = 0; + if( ! m_localStat) + { + res = initCachedStat(name.c_str()); + if (res) return res; + } + + memcpy(&sbuff, m_localStat, sizeof(struct stat)); + return 0; +} + +//______________________________________________________________________________ +long long IOFile::FSize() +{ + return m_file->GetFileSize(); +} + +//______________________________________________________________________________ +int IOFile::initCachedStat(const char* path) +{ + // Called indirectly from the constructor. + + static const char* trace_pfx = "initCachedStat "; + + int res = -1; + struct stat tmpStat; + + if (m_cache.GetOss()->Stat(path, &tmpStat) == XrdOssOK) + { + XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); + XrdOucEnv myEnv; + int res_open; + if ((res_open = infoFile->Open(path, O_RDONLY, 0600, myEnv)) == XrdOssOK) + { + Info info(m_cache.GetTrace()); + if (info.Read(infoFile, path)) + { + tmpStat.st_size = info.GetFileSize(); + TRACEIO(Info, trace_pfx << "successfully read size from info file = " << tmpStat.st_size); + res = 0; + } + else + { + // file exist but can't read it + TRACEIO(Info, trace_pfx << "info file is incomplete or corrupt"); + } + } + else + { + TRACEIO(Error, trace_pfx << "can't open info file " << XrdSysE2T(-res_open)); + } + infoFile->Close(); + delete infoFile; + } + + if (res) + { + res = GetInput()->Fstat(tmpStat); + TRACEIO(Debug, trace_pfx << "got stat from client res = " << res << ", size = " << tmpStat.st_size); + } + + if (res == 0) + { + m_localStat = new struct stat; + memcpy(m_localStat, &tmpStat, sizeof(struct stat)); + } + return res; +} + +//______________________________________________________________________________ +void IOFile::Update(XrdOucCacheIO &iocp) +{ + IO::Update(iocp); + m_file->ioUpdated(this); +} + +//______________________________________________________________________________ +bool IOFile::ioActive() +{ + RefreshLocation(); + return m_file->ioActive(this); +} + +//______________________________________________________________________________ +void IOFile::DetachFinalize() +{ + // Effectively a destructor. + + TRACE(Info, "DetachFinalize() " << this); + + m_file->RequestSyncOfDetachStats(); + Cache::GetInstance().ReleaseFile(m_file, this); + + delete this; +} + + +//============================================================================== +// Read and pgRead - sync / async and helpers +//============================================================================== + +//______________________________________________________________________________ +int IOFile::Read(char *buff, long long off, int size) +{ + auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr); + + TRACEIO(Dump, "Read() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size); + + rh->m_cond.Lock(); + int retval = ReadBegin(buff, off, size, rh); + if (retval == -EWOULDBLOCK) + { + rh->m_cond.Wait(); + retval = rh->m_retval; + } + rh->m_cond.UnLock(); + + return ReadEnd(retval, rh); +} + +//______________________________________________________________________________ +void IOFile::Read(XrdOucCacheIOCB &iocb, char *buff, long long off, int size) +{ + struct ZHandler : ReadReqRH + { using ReadReqRH::ReadReqRH; + IOFile *m_io = nullptr; + + void Done(int result) override { m_io->ReadEnd(result, this); } + }; + + auto *rh = new ZHandler(ObtainReadSid(), &iocb); + rh->m_io = this; + + TRACEIO(Dump, "Read() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size); + + int retval = ReadBegin(buff, off, size, rh); + if (retval != -EWOULDBLOCK) + { + rh->Done(retval); + } +} + +//______________________________________________________________________________ +void IOFile::pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size, + std::vector &csvec, uint64_t opts, int *csfix) +{ + struct ZHandler : ReadReqRH + { using ReadReqRH::ReadReqRH; + IOFile *m_io = nullptr; + std::function m_lambda {0}; + + void Done(int result) override { if (m_lambda) m_lambda(result); m_io-> ReadEnd(result, this); } + }; + + auto *rh = new ZHandler(ObtainReadSid(), &iocb); + rh->m_io = this; + + TRACEIO(Dump, "pgRead() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " off: " << off << " size: " << size); + + if (opts & XrdOucCacheIO::forceCS) + rh->m_lambda = [=, &csvec](int result) { + if (result > 0) + XrdOucPgrwUtils::csCalc((const char *)buff, (ssize_t)off, (size_t)result, csvec); + }; + + int retval = ReadBegin(buff, off, size, rh); + if (retval != -EWOULDBLOCK) + { + rh->Done(retval); + } +} + +//______________________________________________________________________________ +int IOFile::ReadBegin(char *buff, long long off, int size, ReadReqRH *rh) +{ + // protect from reads over the file size + if (off >= FSize()) { + size = 0; + return 0; + } + if (off < 0) { + return -EINVAL; + } + if (off + size > FSize()) { + size = FSize() - off; + } + rh->m_expected_size = size; + + return m_file->Read(this, buff, off, size, rh); +} + +//______________________________________________________________________________ +int IOFile::ReadEnd(int retval, ReadReqRH *rh) +{ + TRACEIO(Dump, "ReadEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " retval: " << retval << " expected_size: " << rh->m_expected_size); + + if (retval < 0) { + TRACEIO(Warning, "ReadEnd() error in File::Read(), exit status=" << retval << ", error=" << XrdSysE2T(-retval) << " sid: " << Xrd::hex1 << rh->m_seq_id); + } else if (retval < rh->m_expected_size) { + TRACEIO(Warning, "ReadEnd() bytes missed " << rh->m_expected_size - retval << " sid: " << Xrd::hex1 << rh->m_seq_id); + } + if (rh->m_iocb) + rh->m_iocb->Done(retval); + + delete rh; + + return retval; +} + + +//============================================================================== +// ReadV +//============================================================================== + +//______________________________________________________________________________ +int IOFile::ReadV(const XrdOucIOVec *readV, int n) +{ + auto *rh = new ReadReqRHCond(ObtainReadSid(), nullptr); + + TRACEIO(Dump, "ReadV() sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n); + + rh->m_cond.Lock(); + int retval = ReadVBegin(readV, n, rh); + if (retval == -EWOULDBLOCK) + { + rh->m_cond.Wait(); + retval = rh->m_retval; + } + rh->m_cond.UnLock(); + return ReadVEnd(retval, rh); +} + +//______________________________________________________________________________ +void IOFile::ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n) +{ + struct ZHandler : ReadReqRH + { using ReadReqRH::ReadReqRH; + IOFile *m_io = nullptr; + + void Done(int result) override { m_io-> ReadVEnd(result, this); } + }; + + auto *rh = new ZHandler(ObtainReadSid(), &iocb); + rh->m_io = this; + + TRACEIO(Dump, "ReadV() async " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << " n_chunks: " << n); + + int retval = ReadVBegin(readV, n, rh); + if (retval != -EWOULDBLOCK) + { + rh->Done(retval); + } +} + +//______________________________________________________________________________ +int IOFile::ReadVBegin(const XrdOucIOVec *readV, int n, ReadReqRH *rh) +{ + long long file_size = FSize(); + for (int i = 0; i < n; ++i) + { + const XrdOucIOVec &vr = readV[i]; + if (vr.offset < 0 || vr.offset >= file_size || + vr.offset + vr.size > file_size) + { + return -EINVAL; + } + rh->m_expected_size += vr.size; + } + rh->m_n_chunks = n; + + return m_file->ReadV(this, readV, n, rh); +} + +//______________________________________________________________________________ +int IOFile::ReadVEnd(int retval, ReadReqRH *rh) +{ + TRACEIO(Dump, "ReadVEnd() " << (rh->m_iocb ? "a" : "") << "sync " << this << " sid: " << Xrd::hex1 << rh->m_seq_id << + " retval: " << retval << " n_chunks: " << rh->m_n_chunks << " expected_size: " << rh->m_expected_size); + + if (retval < 0) { + TRACEIO(Warning, "ReadVEnd() error in File::ReadV(), exit status=" << retval << ", error=" << XrdSysE2T(-retval)); + } else if (retval < rh->m_expected_size) { + TRACEIO(Warning, "ReadVEnd() bytes missed " << rh->m_expected_size - retval); + } + if (rh->m_iocb) + rh->m_iocb->Done(retval); + + delete rh; + + return retval; +} \ No newline at end of file diff --git a/src/XrdPfc/XrdPfcIOEntireFile.hh b/src/XrdPfc/XrdPfcIOFile.hh similarity index 70% rename from src/XrdPfc/XrdPfcIOEntireFile.hh rename to src/XrdPfc/XrdPfcIOFile.hh index b7779c3fa41..f8c6fe09ebf 100644 --- a/src/XrdPfc/XrdPfcIOEntireFile.hh +++ b/src/XrdPfc/XrdPfcIOFile.hh @@ -36,12 +36,12 @@ namespace XrdPfc //! \brief Downloads original file into a single file on local disk. //! Handles read requests as they come along. //---------------------------------------------------------------------------- -class IOEntireFile : public IO +class IOFile : public IO { public: - IOEntireFile(XrdOucCacheIO *io, Cache &cache); + IOFile(XrdOucCacheIO *io, Cache &cache); - ~IOEntireFile(); + ~IOFile(); //------------------------------------------------------------------------ //! Check if File was opened successfully. @@ -50,47 +50,46 @@ public: //--------------------------------------------------------------------- //! Pass Read request to the corresponding File object. - //! - //! @param Buffer - //! @param Offset - //! @param Length - //! - //! @return number of bytes read //--------------------------------------------------------------------- - using XrdOucCacheIO::Read; - - virtual int Read(char *Buffer, long long Offset, int Length); + int Read(char *buff, long long off, int size) override; + void Read(XrdOucCacheIOCB &iocb, char *buff, long long off, int size) override; + void pgRead(XrdOucCacheIOCB &iocb, char *buff, long long off, int size, + std::vector &csvec, uint64_t opts=0, int *csfix=0) override; + using XrdOucCacheIO::pgRead; //--------------------------------------------------------------------- //! Pass ReadV request to the corresponding File object. - //! - //! @param readV - //! @param n number of XrdOucIOVecs - //! - //! @return total bytes read //--------------------------------------------------------------------- - using XrdOucCacheIO::ReadV; - - virtual int ReadV(const XrdOucIOVec *readV, int n); + int ReadV(const XrdOucIOVec *readV, int n) override; + void ReadV(XrdOucCacheIOCB &iocb, const XrdOucIOVec *readV, int n) override; - virtual void Update(XrdOucCacheIO &iocp); + void Update(XrdOucCacheIO &iocp) override; - //! \brief Abstract virtual method of XrdPfcIO + //! \brief Abstract virtual method of XrdPfc::IO //! Called to check if destruction needs to be done in a separate task. - bool ioActive() /* override */; + bool ioActive() override; - //! \brief Abstract virtual method of XrdPfcIO + //! \brief Abstract virtual method of XrdPfc::IO //! Called to destruct the IO object after it is no longer used. - void DetachFinalize() /* override */; + void DetachFinalize() override; - virtual int Fstat(struct stat &sbuff); + int Fstat(struct stat &sbuff) override; - virtual long long FSize(); + long long FSize() override; private: File *m_file; + + int ReadBegin(char *buff, long long off, int size, ReadReqRH *rh); + int ReadEnd(int retval, ReadReqRH *rh); + + int ReadVBegin(const XrdOucIOVec *readV, int n, ReadReqRH *rh); + int ReadVEnd(int retval, ReadReqRH *rh); + struct stat *m_localStat; int initCachedStat(const char* path); + + }; } diff --git a/src/XrdPfc/XrdPfcIOFileBlock.cc b/src/XrdPfc/XrdPfcIOFileBlock.cc index 5cf031abefb..deb7ee2a38c 100644 --- a/src/XrdPfc/XrdPfcIOFileBlock.cc +++ b/src/XrdPfc/XrdPfcIOFileBlock.cc @@ -256,9 +256,8 @@ int IOFileBlock::initLocalStat() // This is writing the top-level cinfo // The info file is used to get file size on defer open // don't initalize buffer, it does not hold useful information in this case - m_info.SetBufferSize(m_cache.RefConfiguration().m_bufferSize); + m_info.SetBufferSizeFileSizeAndCreationTime(m_cache.RefConfiguration().m_bufferSize, tmpStat.st_size); // m_info.DisableDownloadStatus(); -- this stopped working a while back. - m_info.SetFileSizeAndCreationTime(tmpStat.st_size); m_info.Write(m_info_file, path.c_str()); m_info_file->Fsync(); } @@ -354,9 +353,33 @@ int IOFileBlock::Read(char *buff, long long off, int size) TRACEIO(Dump, "Read() block[ " << blockIdx << "] read-block-size[" << readBlockSize << "], offset[" << readBlockSize << "] off = " << off ); - int retvalBlock = (fb != 0) ? - fb->Read(this, buff, off, readBlockSize) : - GetInput()->Read(buff, off, readBlockSize); + int retvalBlock; + if (fb != 0) + { + struct ZHandler : public ReadReqRH + { using ReadReqRH::ReadReqRH; + XrdSysCondVar m_cond {0}; + int m_retval {0}; + + void Done(int result) override + { m_cond.Lock(); m_retval = result; m_cond.Signal(); m_cond.UnLock(); } + }; + + ReadReqRHCond rh(ObtainReadSid(), nullptr); + + rh.m_cond.Lock(); + retvalBlock = fb->Read(this, buff, off, readBlockSize, &rh); + if (retvalBlock == -EWOULDBLOCK) + { + rh.m_cond.Wait(); + retvalBlock = rh.m_retval; + } + rh.m_cond.UnLock(); + } + else + { + retvalBlock = GetInput()->Read(buff, off, readBlockSize); + } TRACEIO(Dump, "Read() Block read returned " << retvalBlock); if (retvalBlock == readBlockSize) diff --git a/src/XrdPfc/XrdPfcIOFileBlock.hh b/src/XrdPfc/XrdPfcIOFileBlock.hh index b56afac26c4..31a3c5e77d6 100644 --- a/src/XrdPfc/XrdPfcIOFileBlock.hh +++ b/src/XrdPfc/XrdPfcIOFileBlock.hh @@ -42,26 +42,26 @@ public: ~IOFileBlock(); - //! \brief Abstract virtual method of XrdPfcIO + //! \brief Abstract virtual method of XrdPfc::IO //! Called to check if destruction needs to be done in a separate task. - bool ioActive() /* override */; + bool ioActive() override; - //! \brief Abstract virtual method of XrdPfcIO + //! \brief Abstract virtual method of XrdPfc::IO //! Called to destruct the IO object after it is no longer used. - void DetachFinalize() /* override */; + void DetachFinalize() override; //--------------------------------------------------------------------- //! Pass Read request to the corresponding File object. //--------------------------------------------------------------------- using XrdOucCacheIO::Read; - virtual int Read(char *Buffer, long long Offset, int Length); + int Read(char *Buffer, long long Offset, int Length) override; - virtual int Fstat(struct stat &sbuff); + int Fstat(struct stat &sbuff) override; - virtual long long FSize(); + long long FSize() override; - virtual void Update(XrdOucCacheIO &iocp); + void Update(XrdOucCacheIO &iocp) override; private: long long m_blocksize; //!< size of file-block diff --git a/src/XrdPfc/XrdPfcInfo.cc b/src/XrdPfc/XrdPfcInfo.cc index 394b7e8e743..d063affd33d 100644 --- a/src/XrdPfc/XrdPfcInfo.cc +++ b/src/XrdPfc/XrdPfcInfo.cc @@ -127,6 +127,7 @@ Info::Info(XrdSysTrace* trace, bool prefetchBuffer) : m_buff_synced(0), m_buff_written(0), m_buff_prefetch(0), m_version(0), m_bitvecSizeInBits(0), + m_missingBlocks(0), m_complete(false), m_hasPrefetchBuffer(prefetchBuffer), m_cksCalcMd5(0) @@ -159,16 +160,10 @@ void Info::SetAllBitsSynced() //------------------------------------------------------------------------------ -void Info::SetBufferSize(long long bs) +void Info::SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs) { - // Needed only info is created first time in File::Open() + // Needed only when Info object is created for the first time in File::Open() m_store.m_buffer_size = bs; -} - -//------------------------------------------------------------------------------s - -void Info::SetFileSizeAndCreationTime(long long fs) -{ m_store.m_file_size = fs; ResizeBits(); m_store.m_creationTime = time(0); @@ -191,6 +186,9 @@ void Info::ResizeBits() memset(m_buff_written, 0, GetBitvecSizeInBytes()); memset(m_buff_synced, 0, GetBitvecSizeInBytes()); + m_missingBlocks = m_bitvecSizeInBits; + m_complete = false; + if (m_hasPrefetchBuffer) { m_buff_prefetch = (unsigned char*) malloc(GetBitvecSizeInBytes()); @@ -351,7 +349,7 @@ bool Info::Read(XrdOssDF *fp, const char *dname, const char *fname) memcpy(m_buff_written, m_buff_synced, GetBitvecSizeInBytes()); - m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); + UpdateDownloadCompleteStatus(); return true; } @@ -523,7 +521,7 @@ bool Info::ReadV3(XrdOssDF* fp, off_t off, const char *dname, const char *fname) } // cache complete status - m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); + UpdateDownloadCompleteStatus(); // read creation time if (r.Read(m_store.m_creationTime)) return false; @@ -587,7 +585,7 @@ bool Info::ReadV2(XrdOssDF* fp, off_t off, const char *dname, const char *fname) } // cache complete status - m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); + UpdateDownloadCompleteStatus(); // read creation time if (r.Read(m_store.m_creationTime)) return false; diff --git a/src/XrdPfc/XrdPfcInfo.hh b/src/XrdPfc/XrdPfcInfo.hh index 74e9b42f391..36a5bc37ad0 100644 --- a/src/XrdPfc/XrdPfcInfo.hh +++ b/src/XrdPfc/XrdPfcInfo.hh @@ -134,9 +134,7 @@ public: //--------------------------------------------------------------------- void SetAllBitsSynced(); - void SetBufferSize(long long); - - void SetFileSizeAndCreationTime(long long); + void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs); //--------------------------------------------------------------------- //! \brief Reserve bit vectors for file_size / buffer_size bytes. @@ -199,7 +197,7 @@ public: //--------------------------------------------------------------------- //! Check download status in given block range //--------------------------------------------------------------------- - bool IsAnythingEmptyInRng(int firstIdx, int lastIdx) const; + int CountBlocksNotWrittenInRng(int firstIdx, int lastIdx) const; //--------------------------------------------------------------------- //! Get size of download-state bit-vector in bytes. @@ -305,6 +303,7 @@ public: bool HasNoCkSumTime() const { return m_store.m_noCkSumTime != 0; } time_t GetNoCkSumTime() const { return m_store.m_noCkSumTime; } time_t GetNoCkSumTimeForUVKeep() const { return m_store.m_noCkSumTime ? m_store.m_noCkSumTime : m_store.m_creationTime; } + void ResetNoCkSumTime() { m_store.m_noCkSumTime = 0; } #ifdef XRDPFC_CKSUM_TEST static void TestCksumStuff(); @@ -329,7 +328,8 @@ protected: int m_version; int m_bitvecSizeInBits; //!< cached - bool m_complete; //!< cached + int m_missingBlocks; //!< cached, updated in SetBitWritten() + bool m_complete; //!< cached; if false, set to true when missingBlocks hit zero bool m_hasPrefetchBuffer; //!< constains current prefetch score private: @@ -359,7 +359,11 @@ inline void Info::SetBitWritten(int i) assert(cn < GetBitvecSizeInBytes()); const int off = i - cn*8; + m_buff_written[cn] |= cfiBIT(off); + + if (--m_missingBlocks == 0) + m_complete = true; } inline void Info::SetBitPrefetch(int i) @@ -449,19 +453,21 @@ inline bool Info::IsComplete() const return m_complete; } -inline bool Info::IsAnythingEmptyInRng(int firstIdx, int lastIdx) const +inline int Info::CountBlocksNotWrittenInRng(int firstIdx, int lastIdx) const { // TODO rewrite to use full byte comparisons outside of edges ? // Also, it seems to be always called with firstIdx = 0, lastIdx = m_bitvecSizeInBits. + int cnt = 0; for (int i = firstIdx; i < lastIdx; ++i) - if (! TestBitWritten(i)) return true; + if (! TestBitWritten(i)) ++cnt; - return false; + return cnt; } inline void Info::UpdateDownloadCompleteStatus() { - m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); + m_missingBlocks = CountBlocksNotWrittenInRng(0, m_bitvecSizeInBits); + m_complete = (m_missingBlocks == 0); } inline long long Info::GetBufferSize() const diff --git a/src/XrdPfc/XrdPfcStats.hh b/src/XrdPfc/XrdPfcStats.hh index b10622102c6..ae06f194dd2 100644 --- a/src/XrdPfc/XrdPfcStats.hh +++ b/src/XrdPfc/XrdPfcStats.hh @@ -65,6 +65,13 @@ public: m_BytesBypassed += s.m_BytesBypassed; } + void AddBytesHit(long long bh) + { + XrdSysMutexHelper _lock(&m_Mutex); + + m_BytesHit += bh; + } + void AddWriteStats(long long bytes_written, int n_cks_errs) { XrdSysMutexHelper _lock(&m_Mutex); diff --git a/src/XrdPfc/XrdPfcVRead.cc b/src/XrdPfc/XrdPfcVRead.cc deleted file mode 100644 index efed9e8c625..00000000000 --- a/src/XrdPfc/XrdPfcVRead.cc +++ /dev/null @@ -1,419 +0,0 @@ -#include "XrdPfcFile.hh" -#include "XrdPfc.hh" -#include "XrdPfcTrace.hh" - -#include "XrdPfcInfo.hh" -#include "XrdPfcStats.hh" -#include "XrdPfcIO.hh" - -#include "XrdOss/XrdOss.hh" -#include "XrdCl/XrdClDefaultEnv.hh" -#include "XrdCl/XrdClFile.hh" -#include "XrdCl/XrdClXRootDResponses.hh" - -namespace XrdPfc -{ -// A list of IOVec chuncks that match a given block index. -// arr vector holds chunk readv indicies. -struct ReadVChunkListDisk -{ - ReadVChunkListDisk(int i) : block_idx(i) {} - - int block_idx; - std::vector arr; -}; - -struct ReadVChunkListRAM -{ - ReadVChunkListRAM(Block* b, std::vector * iarr, bool ireq) : block(b), arr(iarr), req(ireq) {} - - Block *block; - std::vector *arr; - bool req; // requested here -}; - -// RAM -struct ReadVBlockListRAM -{ - std::vector bv; - - bool AddEntry(Block* block, int chunkIdx, bool ireq) - { - for (std::vector::iterator i = bv.begin(); i != bv.end(); ++i) - { - if (i->block == block) - { - i->arr->push_back(chunkIdx); - return false; - } - } - bv.push_back(ReadVChunkListRAM(block, new std::vector, ireq)); - bv.back().arr->push_back(chunkIdx); - return true; - } -}; - -// Disk -struct ReadVBlockListDisk -{ - std::vector bv; - - void AddEntry(int blockIdx, int chunkIdx) - { - for (std::vector::iterator i = bv.begin(); i != bv.end(); ++i) - { - if (i->block_idx == blockIdx) - { - i->arr.push_back(chunkIdx); - return; - } - } - bv.push_back(ReadVChunkListDisk(blockIdx)); - bv.back().arr.push_back(chunkIdx); - } -}; -} - -using namespace XrdPfc; - -//------------------------------------------------------------------------------ - -int File::ReadV(IO *io, const XrdOucIOVec *readV, int n) -{ - TRACEF(Dump, "ReadV for " << n << " chunks."); - - if ( ! VReadValidate(readV, n)) - { - return -EINVAL; - } - - Stats loc_stats; - - int bytes_read = 0; - int error_cond = 0; // to be set to -errno - - BlockList_t blks_to_request; - ReadVBlockListRAM blocks_to_process; - std::vector blks_processed; - ReadVBlockListDisk blocks_on_disk; - std::vector chunkVec; - DirectResponseHandler *direct_handler = 0; - - m_state_cond.Lock(); - - if ( ! m_is_open) - { - m_state_cond.UnLock(); - TRACEF(Error, "ReadV file is not open"); - return io->GetInput()->ReadV(readV, n); - } - - if (m_in_shutdown) - { - m_state_cond.UnLock(); - return -ENOENT; - } - - VReadPreProcess(io, readV, n, blks_to_request, blocks_to_process, blocks_on_disk, chunkVec); - - m_state_cond.UnLock(); - - // ---------------------------------------------------------------- - - // Request blocks that need to be fetched. - ProcessBlockRequests(blks_to_request); - - // Issue direct / bypass requests if any. - if ( ! chunkVec.empty()) - { - direct_handler = new DirectResponseHandler(1); - io->GetInput()->ReadV(*direct_handler, &chunkVec[0], chunkVec.size()); - } - - // Read data from disk. - { - int dr = VReadFromDisk(readV, n, blocks_on_disk); - if (dr >= 0) - { - bytes_read += dr; - loc_stats.m_BytesHit += dr; - } - else error_cond = dr; - } - - // Fill response buffer with data from blocks in RAM. - { - long long b_hit = 0, b_missed = 0; - int br = VReadProcessBlocks(io, readV, n, blocks_to_process.bv, blks_processed, b_hit, b_missed); - if (br >= 0) - { - bytes_read += br; - loc_stats.m_BytesHit += b_hit; - loc_stats.m_BytesMissed += b_missed; - } - else if ( ! error_cond) error_cond = br; - } - - // Wait for direct requests to arrive. - if (direct_handler != 0) - { - XrdSysCondVarHelper _lck(direct_handler->m_cond); - - while (direct_handler->m_to_wait > 0) - { - direct_handler->m_cond.Wait(); - } - - if (direct_handler->m_errno == 0) - { - for (std::vector::iterator i = chunkVec.begin(); i != chunkVec.end(); ++i) - { - bytes_read += i->size; - loc_stats.m_BytesBypassed += i->size; - } - } - else if ( ! error_cond) error_cond = direct_handler->m_errno; - - delete direct_handler; - } - - { // Release processed blocks. - XrdSysCondVarHelper _lck(m_state_cond); - - for (std::vector::iterator i = blks_processed.begin(); i != blks_processed.end(); ++i) - { - dec_ref_count(i->block); - } - } - assert (blocks_to_process.bv.empty()); - - for (std::vector::iterator i = blks_processed.begin(); i != blks_processed.end(); ++i) - delete i->arr; - - m_stats.AddReadStats(loc_stats); - - TRACEF(Dump, "VRead exit, error_cond=" << error_cond << ", bytes_read=" << bytes_read); - return error_cond ? error_cond : bytes_read; -} - -//------------------------------------------------------------------------------ - -bool File::VReadValidate(const XrdOucIOVec *vr, int n) -{ - for (int i = 0; i < n; ++i) - { - if (vr[i].offset < 0 || vr[i].offset >= m_file_size || - vr[i].offset + vr[i].size > m_file_size) - { - return false; - } - } - return true; -} - -//------------------------------------------------------------------------------ - -void File::VReadPreProcess(IO *io, const XrdOucIOVec *readV, int n, - BlockList_t &blks_to_request, - ReadVBlockListRAM &blocks_to_process, - ReadVBlockListDisk &blocks_on_disk, - std::vector &chunkVec) -{ - // Must be called under m_state_cond lock. - - for (int iov_idx = 0; iov_idx < n; iov_idx++) - { - const int blck_idx_first = readV[iov_idx].offset / m_cfi.GetBufferSize(); - const int blck_idx_last = (readV[iov_idx].offset + readV[iov_idx].size - 1) / m_cfi.GetBufferSize(); - - for (int block_idx = blck_idx_first; block_idx <= blck_idx_last; ++block_idx) - { - TRACEF(Dump, "VReadPreProcess chunk "<< readV[iov_idx].size << "@"<< readV[iov_idx].offset); - - BlockMap_i bi = m_block_map.find(block_idx); - if (bi != m_block_map.end()) - { - if (blocks_to_process.AddEntry(bi->second, iov_idx, false)) - inc_ref_count(bi->second); - - TRACEF(Dump, "VReadPreProcess block "<< block_idx <<" in map"); - } - else if (m_cfi.TestBitWritten(offsetIdx(block_idx))) - { - blocks_on_disk.AddEntry(block_idx, iov_idx); - - TRACEF(Dump, "VReadPreProcess block "<< block_idx <<" , chunk idx = " << iov_idx << " on disk"); - } - else - { - Block *b = PrepareBlockRequest(block_idx, io, false); - - if (b) - { - inc_ref_count(b); - blocks_to_process.AddEntry(b, iov_idx, true); - blks_to_request.push_back(b); - - TRACEF(Dump, "VReadPreProcess request block " << block_idx); - } - else - { - long long off; // offset in user buffer - long long blk_off; // offset in block - long long size; // size to copy - const long long BS = m_cfi.GetBufferSize(); - overlap(block_idx, BS, readV[iov_idx].offset, readV[iov_idx].size, off, blk_off, size); - chunkVec.push_back(XrdOucIOVec2(readV[iov_idx].data+off, BS*block_idx + blk_off,size)); - - TRACEF(Dump, "VReadPreProcess direct read " << block_idx); - } - } - } - } -} - -//------------------------------------------------------------------------------ - -int File::VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk& blocks_on_disk) -{ - int bytes_read = 0; - for (std::vector::iterator bit = blocks_on_disk.bv.begin(); bit != blocks_on_disk.bv.end(); ++bit ) - { - int blockIdx = bit->block_idx; - for (std::vector::iterator chunkIt = bit->arr.begin(); chunkIt != bit->arr.end(); ++chunkIt) - { - int chunkIdx = *chunkIt; - - long long off; // offset in user buffer - long long blk_off; // offset in block - long long size; // size to copy - - TRACEF(Dump, "VReadFromDisk block= " << blockIdx <<" chunk=" << chunkIdx); - - overlap(blockIdx, m_cfi.GetBufferSize(), readV[chunkIdx].offset, readV[chunkIdx].size, off, blk_off, size); - - int rs = m_data_file->Read(readV[chunkIdx].data + off, blockIdx*m_cfi.GetBufferSize() + blk_off - m_offset, size); - - if (rs < 0) - { - TRACEF(Error, "VReadFromDisk FAILED rs=" << rs << " block=" << blockIdx << " chunk=" << chunkIdx << " off=" << off << - " blk_off=" << blk_off << " size=" << size << " chunkOff=" << readV[chunkIdx].offset); - return rs; - } - - if (rs != size) - { - TRACEF(Error, "VReadFromDisk FAILED incomplete read rs=" << rs << " block=" << blockIdx << " chunk=" << chunkIdx << " off=" << off << - " blk_off=" << blk_off << " size=" << size << " chunkOff=" << readV[chunkIdx].offset); - return -EIO; - } - - bytes_read += rs; - } - } - - return bytes_read; -} - -//------------------------------------------------------------------------------ - -int File::VReadProcessBlocks(IO *io, const XrdOucIOVec *readV, int n, - std::vector& blocks_to_process, - std::vector& blocks_processed, - long long& bytes_hit, - long long& bytes_missed) -{ - long long bytes_read = 0; - int error_cond = 0; // to be set to -errno - - while ( ! blocks_to_process.empty()) - { - std::vector finished; - BlockList_t to_reissue; - { - XrdSysCondVarHelper _lck(m_state_cond); - - std::vector::iterator bi = blocks_to_process.begin(); - while (bi != blocks_to_process.end()) - { - if (bi->block->is_failed() && bi->block->get_io() != io) - { - TRACEF(Info, "VReadProcessBlocks() requested block " << bi->block << " failed with another io " << - bi->block->get_io() << " - reissuing request with my io " << io); - - bi->block->reset_error_and_set_io(io); - to_reissue.push_back(bi->block); - ++bi; - } - else if (bi->block->is_finished()) - { - finished.push_back(ReadVChunkListRAM(bi->block, bi->arr, bi->req)); - // Here we rely on the fact that std::vector does not reallocate on erase! - blocks_to_process.erase(bi); - } - else - { - ++bi; - } - } - - if (finished.empty() && to_reissue.empty()) - { - m_state_cond.Wait(); - continue; - } - } - - ProcessBlockRequests(to_reissue); - to_reissue.clear(); - - std::vector::iterator bi = finished.begin(); - while (bi != finished.end()) - { - if (bi->block->is_ok()) - { - long long b_read = 0; - for (std::vector::iterator chunkIt = bi->arr->begin(); chunkIt < bi->arr->end(); ++chunkIt) - { - long long off; // offset in user buffer - long long blk_off; // offset in block - long long size; // size to copy - - int block_idx = bi->block->m_offset/m_cfi.GetBufferSize(); - overlap(block_idx, m_cfi.GetBufferSize(), readV[*chunkIt].offset, readV[*chunkIt].size, off, blk_off, size); - - memcpy(readV[*chunkIt].data + off, &(bi->block->m_buff[blk_off]), size); - - b_read += size; - } - - bytes_read += b_read; - if (bi->req) - bytes_missed += b_read; - else - bytes_hit += b_read; - } - else - { - // It has failed ... report only the first error. - if ( ! error_cond) - { - error_cond = bi->block->m_errno; - TRACEF(Error, "VReadProcessBlocks() io " << io << ", block "<< bi->block << - " finished with error " << -error_cond << " " << XrdSysE2T(-error_cond)); - break; - } - } - - ++bi; - } - - // add finished to processed list - std::copy(finished.begin(), finished.end(), std::back_inserter(blocks_processed)); - finished.clear(); - } - - TRACEF(Dump, "VReadProcessBlocks status " << error_cond << ", total read " << bytes_read); - - return error_cond ? error_cond : bytes_read; -}