diff --git a/src/XrdFileCache/XrdFileCache.cc b/src/XrdFileCache/XrdFileCache.cc index 7572103dfb3..0d63aff1926 100644 --- a/src/XrdFileCache/XrdFileCache.cc +++ b/src/XrdFileCache/XrdFileCache.cc @@ -65,7 +65,7 @@ void *PrefetchThread(void* ptr) extern "C" { -XrdOucCache *XrdOucGetCache(XrdSysLogger *logger, +XrdOucCache2 *XrdOucGetCache2(XrdSysLogger *logger, const char *config_filename, const char *parameters) { @@ -139,7 +139,7 @@ Cache::Cache() : XrdOucCache(), //______________________________________________________________________________ -XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) +XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options) { if (Cache::GetInstance().Decide(io)) { diff --git a/src/XrdFileCache/XrdFileCache.hh b/src/XrdFileCache/XrdFileCache.hh index 34086a35e1f..97477a1c5f0 100644 --- a/src/XrdFileCache/XrdFileCache.hh +++ b/src/XrdFileCache/XrdFileCache.hh @@ -22,7 +22,8 @@ #include "XrdVersion.hh" #include "XrdSys/XrdSysPthread.hh" -#include "XrdOuc/XrdOucCache.hh" +#include "XrdOuc/XrdOucCache2.hh" +#include "XrdOuc/XrdOucCallBack.hh" #include "XrdCl/XrdClDefaultEnv.hh" #include "XrdFileCacheFile.hh" #include "XrdFileCacheDecision.hh" @@ -38,6 +39,7 @@ class File; class IO; } + namespace XrdFileCache { //---------------------------------------------------------------------------- @@ -75,7 +77,7 @@ namespace XrdFileCache //---------------------------------------------------------------------------- //! Attaches/creates and detaches/deletes cache-io objects for disk based cache. //---------------------------------------------------------------------------- - class Cache : public XrdOucCache + class Cache : public XrdOucCache2 { public: //--------------------------------------------------------------------- @@ -86,7 +88,7 @@ namespace XrdFileCache //--------------------------------------------------------------------- //! Obtain a new IO object that fronts existing XrdOucCacheIO. //--------------------------------------------------------------------- - virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options=0); + virtual XrdOucCacheIO2 *Attach(XrdOucCacheIO2 *, int Options=0); //--------------------------------------------------------------------- //! Number of cache-io objects atteched through this cache. diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index 9726888a585..a670874846a 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -70,7 +70,7 @@ namespace Cache* cache() { return &Cache::GetInstance(); } } -File::File(XrdOucCacheIO &inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) : +File::File(XrdOucCacheIO2 &inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) : m_input(inputIO), m_output(NULL), m_infoFile(NULL), @@ -334,7 +334,6 @@ Block* File::RequestBlock(int i, bool prefetch) // catch the block while still in memory. clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d pOn=(%d)", i, prefetch); - XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile; const long long BS = m_cfi.GetBufferSize(); const int last_block = m_cfi.GetSizeInBits() - 1; @@ -344,23 +343,18 @@ Block* File::RequestBlock(int i, bool prefetch) Block *b = new Block(this, off, this_bs, prefetch); // should block be reused to avoid recreation - XrdCl::XRootDStatus status = client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b)); - if (status.IsOK()) { - clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() this = %p, b=%p, this idx=%d pOn=(%d) %s", (void*)this, (void*)b, i, prefetch, lPath()); - m_block_map[i] = b; + BlockResponseHandler* oucCB = new BlockResponseHandler(b); + m_input.Read(*oucCB, (char*)b->get_buff(), off, (int)this_bs); - if (m_prefetchState == kOn && m_block_map.size() > Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) - { - m_prefetchState = kHold; - cache()->DeRegisterPrefetchFile(this); - } - return b; - } - else { - clLog()->Error(XrdCl::AppMsg, "File::RequestBlock() error %d, this = %p, b=%p, this idx=%d pOn=(%d) %s", status.code, (void*)this, (void*)b, i, prefetch, lPath()); - XrdPosixMap::Result(status); - return 0; + clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() this = %p, b=%p, this idx=%d pOn=(%d) %s", (void*)this, (void*)b, i, prefetch, lPath()); + m_block_map[i] = b; + + if (m_prefetchState == kOn && m_block_map.size() > Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) + { + m_prefetchState = kHold; + cache()->DeRegisterPrefetchFile(this); } + return b; } //------------------------------------------------------------------------------ @@ -368,8 +362,6 @@ Block* File::RequestBlock(int i, bool prefetch) int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks, char* req_buf, long long req_off, long long req_size) { - XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile; - const long long BS = m_cfi.GetBufferSize(); // XXX Use readv to load more at the same time. @@ -385,16 +377,9 @@ int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks, overlap(*ii, BS, req_off, req_size, off, blk_off, size); - XrdCl::Status status = client.Read(*ii * BS + blk_off, size, req_buf + off, handler); - if (!status.IsOK()) - { - clLog()->Error(XrdCl::AppMsg, "File::RequestBlocksDirect error %s\n", lPath()); - XrdPosixMap::Result(status); - return -1; // AMT all reads should be canceled in this case - } - else { - clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect success %d %ld %s", *ii, size, lPath()); - } + m_input.Read( *handler, req_buf + off, *ii * BS + blk_off, size); + clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect success %d %ld %s", *ii, size, lPath()); + total += size; } @@ -837,13 +822,13 @@ void File::free_block(Block* b) //------------------------------------------------------------------------------ -void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status) +void File::ProcessBlockResponse(Block* b, int res) { m_downloadCond.Lock(); clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %p, %d %s",(void*)b,(int)(b->m_offset/BufferSize()), lPath()); - if (status->IsOK()) + if (res >= 0) { b->m_downloaded = true; clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d finished %d %s",(int)(b->m_offset/BufferSize()), b->is_finished(), lPath()); @@ -862,8 +847,8 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status) { // AMT how long to keep? // when to retry? - clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %p %d error=%d, [%s] %s",(void*)b,(int)(b->m_offset/BufferSize()), status->code, status->GetErrorMessage().c_str(), lPath()); - XrdPosixMap::Result(*status); + clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %p %d error=%d, %s",(void*)b,(int)(b->m_offset/BufferSize()), res, lPath()); + // XrdPosixMap::Result(*status); // AMT could notfiy global cache we dont need RAM for that block b->set_error_and_free(errno); errno = 0; @@ -999,32 +984,26 @@ void File::UnMarkPrefetch() //================== RESPONSE HANDLER ================================== //============================================================================== -void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, - XrdCl::AnyObject *response) +void BlockResponseHandler::Done(int res) { - XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::HandleResponse()"); - - m_block->m_file->ProcessBlockResponse(m_block, status); + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::Done()"); - delete status; - delete response; + m_block->m_file->ProcessBlockResponse(m_block, res); delete this; } //------------------------------------------------------------------------------ -void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, - XrdCl::AnyObject *response) +void DirectResponseHandler::Done(int res) { - XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"DirectResponseHandler::HandleRespons()"); + XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"DirectResponseHandler::Done()"); XrdSysCondVarHelper _lck(m_cond); --m_to_wait; - if ( ! status->IsOK()) + if (res < 0) { - XrdPosixMap::Result(*status); m_errno = errno; } diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index 1a98651cc45..45af499445c 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -21,6 +21,9 @@ #include "XrdCl/XrdClXRootDResponses.hh" #include "XrdCl/XrdClDefaultEnv.hh" +#include "XrdOuc/XrdOucCache2.hh" +#include "XrdOuc/XrdOucIOVec.hh" + #include "XrdFileCacheInfo.hh" #include "XrdFileCacheStats.hh" @@ -94,7 +97,7 @@ namespace XrdFileCache private: enum PrefetchState_e { kOn, kHold, kCanceled }; - XrdOucCacheIO &m_input; //!< original data source + XrdOucCacheIO2 &m_input; //!< original data source XrdOssDF *m_output; //!< file handle for data file on disk XrdOssDF *m_infoFile; //!< file handle for data-info file on disk Info m_cfi; //!< download status of file blocks and access statistics @@ -141,7 +144,7 @@ namespace XrdFileCache //------------------------------------------------------------------------ //! Constructor. //------------------------------------------------------------------------ - File(XrdOucCacheIO &io, std::string &path, + File(XrdOucCacheIO2 &io, std::string &path, long long offset, long long fileSize); //------------------------------------------------------------------------ @@ -174,7 +177,7 @@ namespace XrdFileCache //---------------------------------------------------------------------- Stats& GetStats() { return m_stats; } - void ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status); + void ProcessBlockResponse(Block* b, int res); void WriteBlockToDisk(Block* b); void Prefetch(); @@ -205,7 +208,7 @@ namespace XrdFileCache char* req_buf, long long req_off, long long req_size); // VRead - bool VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& blks_to_process, ReadVBlockListDisk& blks_on_disk, XrdCl::ChunkList& chunkVec); + bool VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& blks_to_process, ReadVBlockListDisk& blks_on_disk, std::vector& chunkVec); int VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk& blks_on_disk); int VReadProcessBlocks(const XrdOucIOVec *readV, int n, std::vector& blks_to_process, std::vector& blks_rocessed); @@ -230,18 +233,17 @@ namespace XrdFileCache // ================================================================ - class BlockResponseHandler : public XrdCl::ResponseHandler + class BlockResponseHandler : public XrdOucCacheIOCB { public: Block *m_block; BlockResponseHandler(Block *b) : m_block(b) {} - void HandleResponse(XrdCl::XRootDStatus *status, - XrdCl::AnyObject *response); + virtual void Done(int result); }; - class DirectResponseHandler : public XrdCl::ResponseHandler + class DirectResponseHandler : public XrdOucCacheIOCB { public: XrdSysCondVar m_cond; @@ -254,8 +256,7 @@ namespace XrdFileCache bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; } bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; } - void HandleResponse(XrdCl::XRootDStatus *status, - XrdCl::AnyObject *response); + virtual void Done(int result); }; } diff --git a/src/XrdFileCache/XrdFileCacheIO.hh b/src/XrdFileCache/XrdFileCacheIO.hh index df2b0fd18c8..815cf196ffe 100644 --- a/src/XrdFileCache/XrdFileCacheIO.hh +++ b/src/XrdFileCache/XrdFileCacheIO.hh @@ -2,7 +2,7 @@ #define __XRDFILECACHE_CACHE_IO_HH__ #include "XrdFileCache.hh" -#include "XrdOuc/XrdOucCache.hh" +#include "XrdOuc/XrdOucCache2.hh" #include "XrdCl/XrdClDefaultEnv.hh" namespace XrdFileCache @@ -10,10 +10,10 @@ namespace XrdFileCache //---------------------------------------------------------------------------- //! Base cache-io class that implements XrdOucCacheIO abstract methods. //---------------------------------------------------------------------------- - class IO : public XrdOucCacheIO + class IO : public XrdOucCacheIO2 { public: - IO (XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache &cache) : + IO (XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache &cache) : m_io(io), m_statsGlobal(stats), m_cache(cache) {} //! Original data source. @@ -36,7 +36,7 @@ namespace XrdFileCache protected: XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); } - XrdOucCacheIO &m_io; //!< original data source + XrdOucCacheIO2 &m_io; //!< original data source XrdOucCacheStats &m_statsGlobal; //!< reference to Cache statistics Cache &m_cache; //!< reference to Cache needed in detach }; diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc index ee620601077..f20dbf9a7cd 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc @@ -31,7 +31,7 @@ using namespace XrdFileCache; //______________________________________________________________________________ -IOEntireFile::IOEntireFile(XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache & cache) +IOEntireFile::IOEntireFile(XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache & cache) : IO(io, stats, cache), m_file(0) { diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh index c9019bd4db2..59f8704359e 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh @@ -43,7 +43,7 @@ namespace XrdFileCache //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ - IOEntireFile(XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache &cache); + IOEntireFile(XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache &cache); //------------------------------------------------------------------------ //! Destructor diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc index ae9b8afe206..921e0a28db1 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc @@ -33,7 +33,7 @@ using namespace XrdFileCache; //______________________________________________________________________________ -IOFileBlock::IOFileBlock(XrdOucCacheIO &io, XrdOucCacheStats &statsGlobal, Cache & cache) +IOFileBlock::IOFileBlock(XrdOucCacheIO2 &io, XrdOucCacheStats &statsGlobal, Cache & cache) : IO(io, statsGlobal, cache) { m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize; @@ -83,7 +83,7 @@ void IOFileBlock::GetBlockSizeFromPath() } //______________________________________________________________________________ -File* IOFileBlock::newBlockFile(long long off, int blocksize, XrdOucCacheIO* io) +File* IOFileBlock::newBlockFile(long long off, int blocksize, XrdOucCacheIO2* io) { XrdCl::URL url(io->Path()); std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.hh b/src/XrdFileCache/XrdFileCacheIOFileBlock.hh index b11cf139ab6..8fb43736bbe 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.hh +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.hh @@ -20,7 +20,7 @@ #include #include -#include "XrdOuc/XrdOucCache.hh" +#include "XrdOuc/XrdOucCache2.hh" #include "XrdSys/XrdSysPthread.hh" #include "XrdFileCacheIO.hh" @@ -41,7 +41,7 @@ namespace XrdFileCache //------------------------------------------------------------------------ //! Constructor. //------------------------------------------------------------------------ - IOFileBlock(XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache &cache); + IOFileBlock(XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache &cache); //------------------------------------------------------------------------ //! Destructor. @@ -70,7 +70,7 @@ namespace XrdFileCache XrdSysMutex m_mutex; //!< map mutex void GetBlockSizeFromPath(); - File* newBlockFile(long long off, int blocksize, XrdOucCacheIO* io); + File* newBlockFile(long long off, int blocksize, XrdOucCacheIO2* io); }; } diff --git a/src/XrdFileCache/XrdFileCacheVRead.cc b/src/XrdFileCache/XrdFileCacheVRead.cc index 451502ebc01..5e8103cc561 100644 --- a/src/XrdFileCache/XrdFileCacheVRead.cc +++ b/src/XrdFileCache/XrdFileCacheVRead.cc @@ -75,7 +75,7 @@ int File::ReadV (const XrdOucIOVec *readV, int n) ReadVBlockListRAM blocks_to_process; std::vector blks_processed; ReadVBlockListDisk blocks_on_disk; - XrdCl::ChunkList chunkVec; + std::vector chunkVec; DirectResponseHandler* direct_handler = 0; if (!VReadPreProcess(readV, n, blocks_to_process, blocks_on_disk, chunkVec)) bytesRead = -1; @@ -86,12 +86,9 @@ int File::ReadV (const XrdOucIOVec *readV, int n) if (bytesRead >= 0) { if (!chunkVec.empty()) { direct_handler = new DirectResponseHandler(1); - XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile; // TODO check interface in the client file - XrdCl::XRootDStatus vrStatus = client.VectorRead(chunkVec, (void*) 0, direct_handler); - if (!vrStatus.IsOK()) { - bytesRead = -1; - } + // m_input.VectorRead(chunkVec, (void*) 0, direct_handler); + m_input.ReadV(*direct_handler, &chunkVec[0], chunkVec.size()); } } @@ -125,8 +122,8 @@ int File::ReadV (const XrdOucIOVec *readV, int n) if (direct_handler->m_errno == 0) { - for (XrdCl::ChunkList::iterator i = chunkVec.begin(); i != chunkVec.end(); ++i) - bytesRead += i->length; + for (std::vector::iterator i = chunkVec.begin(); i != chunkVec.end(); ++i) + bytesRead += i->size; } else { @@ -163,7 +160,7 @@ int File::ReadV (const XrdOucIOVec *readV, int n) //______________________________________________________________________________ -bool File::VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& blocks_to_process, ReadVBlockListDisk& blocks_on_disk, XrdCl::ChunkList& chunkVec) +bool File::VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& blocks_to_process, ReadVBlockListDisk& blocks_on_disk, std::vector& chunkVec) { XrdSysCondVarHelper _lck(m_downloadCond); for (int iov_idx=0; iov_idxDebug(XrdCl::AppMsg, "VReadPreProcess direct read %d", block_idx); } }