diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index 9c1adb35a19..a2665d5ce59 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -18,6 +18,7 @@ #include "XrdFileCacheFile.hh" +#include "XrdFileCacheIO.hh" #include "XrdFileCacheTrace.hh" #include @@ -71,8 +72,8 @@ namespace Cache* cache() { return &Cache::GetInstance(); } } -File::File(XrdOucCacheIO2 *inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) : -m_input(inputIO), +File::File(IO *io, std::string& disk_file_path, long long iOffset, long long iFileSize) : +m_io(io), m_output(NULL), m_infoFile(NULL), m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0), @@ -338,7 +339,7 @@ Block* File::RequestBlock(int i, bool prefetch) TRACEF(Dump, "File::RequestBlock() " << i << "prefetch" << prefetch << "address " << (void*)b); BlockResponseHandler* oucCB = new BlockResponseHandler(b); - m_input->Read(*oucCB, (char*)b->get_buff(), off, (int)this_bs); + m_io->GetInput()->Read(*oucCB, (char*)b->get_buff(), off, (int)this_bs); m_block_map[i] = b; @@ -370,7 +371,7 @@ int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks, overlap(*ii, BS, req_off, req_size, off, blk_off, size); - m_input->Read( *handler, req_buf + off, *ii * BS + blk_off, size); + m_io->GetInput()->Read( *handler, req_buf + off, *ii * BS + blk_off, size); TRACEF(Dump, "RequestBlockDirect success, idx = " << *ii << " size = " << size); total += size; diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index 9f7c7d597bd..68aa388b211 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -42,7 +42,8 @@ namespace XrdFileCache { class BlockResponseHandler; class DirectResponseHandler; - + class IO; + struct ReadVBlockListRAM; struct ReadVChunkListRAM; struct ReadVBlockListDisk; @@ -97,7 +98,7 @@ namespace XrdFileCache private: enum PrefetchState_e { kOn, kHold, kStopped, kComplete }; - XrdOucCacheIO2 *m_input; //!< original data source + IO *m_io; //!< original data source XrdOssDF *m_output; //!< file handle for data file on disk XrdOssDF *m_infoFile; //!< file handle for data-info file on disk Info m_cfi; //!< download status of file blocks and access statistics @@ -144,7 +145,7 @@ namespace XrdFileCache //------------------------------------------------------------------------ //! Constructor. //------------------------------------------------------------------------ - File(XrdOucCacheIO2 *io, std::string &path, + File(IO *io, std::string &path, long long offset, long long fileSize); //------------------------------------------------------------------------ diff --git a/src/XrdFileCache/XrdFileCacheIO.cc b/src/XrdFileCache/XrdFileCacheIO.cc new file mode 100644 index 00000000000..3eefceaa685 --- /dev/null +++ b/src/XrdFileCache/XrdFileCacheIO.cc @@ -0,0 +1,31 @@ +#include "XrdFileCacheIO.hh" +#include "XrdSys/XrdSysAtomics.hh" +#include "XrdPosix/XrdPosixFile.hh" + +using namespace XrdFileCache; + +IO::IO(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache): +m_statsGlobal(stats), m_cache(cache), m_traceID("IO"), m_io(io) +{ + m_path = m_io->Path(); +} + +void IO::Update(XrdOucCacheIO2 &iocp) +{ + SetInput(&iocp); +} + + +void IO::SetInput(XrdOucCacheIO2* x) +{ + updMutex.Lock(); + m_io = x; + updMutex.UnLock(); +} + +XrdOucCacheIO2* IO::GetInput() +{ + AtomicBeg(updMutex); + return m_io; + AtomicEnd(updMutex); +} diff --git a/src/XrdFileCache/XrdFileCacheIO.hh b/src/XrdFileCache/XrdFileCacheIO.hh index 3b272788ddc..73376bc38b8 100644 --- a/src/XrdFileCache/XrdFileCacheIO.hh +++ b/src/XrdFileCache/XrdFileCacheIO.hh @@ -6,7 +6,7 @@ class XrdOucTrace; #include "XrdFileCache.hh" #include "XrdOuc/XrdOucCache2.hh" #include "XrdCl/XrdClDefaultEnv.hh" - +#include "XrdSys/XrdSysPthread.hh" namespace XrdFileCache { @@ -16,8 +16,7 @@ namespace XrdFileCache class IO : public XrdOucCacheIO2 { public: - IO (XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache) : - m_io(io), m_statsGlobal(stats), m_cache(cache), m_traceID("IO"){} + IO (XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache); //! Original data source. virtual XrdOucCacheIO *Base() { return m_io; } @@ -35,18 +34,26 @@ namespace XrdFileCache virtual int Write(char *Buffer, long long Offset, int Length) { errno = ENOTSUP; return -1; } - virtual void Update(XrdOucCacheIO2 &iocp) { m_io = &iocp; } + virtual void Update(XrdOucCacheIO2 &iocp); virtual void RelinquishFile(File*) = 0; XrdOucTrace* GetTrace() {return m_cache.GetTrace();} + + XrdOucCacheIO2* GetInput(); - protected: - XrdOucCacheIO2 *m_io; //!< original data source + protected: XrdOucCacheStats &m_statsGlobal; //!< reference to Cache statistics Cache &m_cache; //!< reference to Cache needed in detach const char* m_traceID; + std::string m_path; + const char* GetPath() { return m_path.c_str(); } + + private: + XrdOucCacheIO2 *m_io; //!< original data source + XrdSysRecMutex updMutex; + void SetInput(XrdOucCacheIO2*); }; } diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc index 6734b51539e..9dcbf28ebb9 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc @@ -39,7 +39,7 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache & m_file(0), m_localStat(0) { - XrdCl::URL url(m_io->Path()); + XrdCl::URL url(GetInput()->Path()); std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); if ((m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this))) @@ -55,10 +55,11 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache & if (res) TRACEIO(Error, "IOEntireFile::IOEntireFile, could not get valid stat"); - m_file = new File(io, fname, 0, st.st_size); + m_file = new File(this, fname, 0, st.st_size); } Cache::GetInstance().AddActive(this, m_file); + std::cout << " IOEntireFile::IOEntireFile " << this << std::endl; } @@ -70,7 +71,7 @@ IOEntireFile::~IOEntireFile() int IOEntireFile::Fstat(struct stat &sbuff) { - XrdCl::URL url(m_io->Path()); + XrdCl::URL url(GetPath()); std::string name = url.GetPath(); name += ".cinfo"; @@ -108,9 +109,8 @@ int IOEntireFile::initCachedStat(const char* path) if (m_cache.GetOss()->Stat(path, &tmpStat) == XrdOssOK) { XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str()); XrdOucEnv myEnv; - if (infoFile->Open(path, O_RDONLY, 0600, myEnv) > 0) { + if (infoFile->Open(path, O_RDONLY, 0600, myEnv) == XrdOssOK) { Info info(m_cache.GetTrace()); - printf("reading info file ..\n"); if (info.Read(infoFile) > 0) { tmpStat.st_size = info.GetFileSize(); TRACEIO(Info, "IOEntireFile::initCachedStat successfuly read size from info file = " << tmpStat.st_size); @@ -126,7 +126,7 @@ int IOEntireFile::initCachedStat(const char* path) } if (res) { - res = m_io->Fstat(tmpStat); + res = GetInput()->Fstat(tmpStat); TRACEIO(Debug, "IOEntireFile::initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size); } @@ -148,7 +148,7 @@ bool IOEntireFile::ioActive() XrdOucCacheIO *IOEntireFile::Detach() { - XrdOucCacheIO * io = m_io; + XrdOucCacheIO * io = GetInput(); // This will delete us! m_cache.Detach(this); @@ -203,6 +203,6 @@ int IOEntireFile::Read (char *buff, long long off, int size) */ int IOEntireFile::ReadV (const XrdOucIOVec *readV, int n) { - TRACE(Dump, "IO::ReadV(), get " << n << " requests, " << m_io->Path()); + TRACEIO(Dump, "IO::ReadV(), get " << n << " requests" ); return m_file->ReadV(readV, n); } diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh index 927be01f281..46f1048ef39 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh @@ -101,7 +101,6 @@ namespace XrdFileCache virtual void RelinquishFile(File*); - private: File* m_file; struct stat *m_localStat; diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc index 7fe0d3df8e1..cf464f56e3b 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc @@ -45,7 +45,7 @@ IOFileBlock::IOFileBlock(XrdOucCacheIO2 *io, XrdOucCacheStats &statsGlobal, Cach XrdOucCacheIO* IOFileBlock::Detach() { TRACEIO(Info, "IOFileBlock::Detach() " ); - XrdOucCacheIO * io = m_io; + XrdOucCacheIO * io = GetInput(); for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) @@ -62,7 +62,7 @@ XrdOucCacheIO* IOFileBlock::Detach() void IOFileBlock::GetBlockSizeFromPath() { const static std::string tag = "hdfsbsize="; - std::string path= m_io->Path(); + std::string path= GetInput()->Path(); size_t pos1 = path.find(tag); size_t t = tag.length(); if ( pos1 != path.npos) @@ -85,7 +85,7 @@ void IOFileBlock::GetBlockSizeFromPath() //______________________________________________________________________________ File* IOFileBlock::newBlockFile(long long off, int blocksize) { - XrdCl::URL url(m_io->Path()); + XrdCl::URL url(GetInput()->Path()); std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath(); std::stringstream ss; @@ -101,7 +101,7 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize) File* file; if (!(file = Cache::GetInstance().GetFileWithLocalPath(fname, this))) { - file = new File(m_io, fname, off, blocksize); + file = new File(this, fname, off, blocksize); Cache::GetInstance().AddActive(this, file); } @@ -146,15 +146,18 @@ bool IOFileBlock::ioActive() int IOFileBlock::Read (char *buff, long long off, int size) { // protect from reads over the file size - if (off >= m_io->FSize()) + + long long fileSize = GetInput()->FSize(); + + if (off >= fileSize) return 0; if (off < 0) { errno = EINVAL; return -1; } - if (off + size > m_io->FSize()) - size = m_io->FSize() - off; + if (off + size > fileSize) + size = fileSize - off; long long off0 = off; int idx_first = off0/m_blocksize; @@ -176,10 +179,10 @@ int IOFileBlock::Read (char *buff, long long off, int size) { size_t pbs = m_blocksize; // check if this is last block - int lastIOFileBlock = (m_io->FSize()-1)/m_blocksize; + int lastIOFileBlock = (fileSize-1)/m_blocksize; if (blockIdx == lastIOFileBlock ) { - pbs = m_io->FSize() - blockIdx*m_blocksize; + pbs = fileSize - blockIdx*m_blocksize; // TRACEIO(Dump, "IOFileBlock::Read() last block, change output file size to " << pbs); } diff --git a/src/XrdFileCache/XrdFileCacheTrace.hh b/src/XrdFileCache/XrdFileCacheTrace.hh index 979c87e3275..9bf4e44ecd4 100644 --- a/src/XrdFileCache/XrdFileCacheTrace.hh +++ b/src/XrdFileCache/XrdFileCacheTrace.hh @@ -33,7 +33,7 @@ #define TRACEIO(act, x) \ if (XRD_TRACE What >= TRACE_ ## act) \ - {XRD_TRACE Beg(m_traceID); cerr << TRACE_STR_##act <Path(); XRD_TRACE End();} + {XRD_TRACE Beg(m_traceID); cerr << TRACE_STR_##act <= TRACE_ ## act) \ diff --git a/src/XrdFileCache/XrdFileCacheVRead.cc b/src/XrdFileCache/XrdFileCacheVRead.cc index 1a5fbe69067..c903dc3bd0d 100644 --- a/src/XrdFileCache/XrdFileCacheVRead.cc +++ b/src/XrdFileCache/XrdFileCacheVRead.cc @@ -4,6 +4,7 @@ #include "XrdFileCacheInfo.hh" #include "XrdFileCacheStats.hh" +#include "XrdFileCacheIO.hh" #include "XrdOss/XrdOss.hh" #include "XrdCl/XrdClDefaultEnv.hh" @@ -89,7 +90,7 @@ int File::ReadV (const XrdOucIOVec *readV, int n) direct_handler = new DirectResponseHandler(1); // TODO check interface in the client file // m_input.VectorRead(chunkVec, (void*) 0, direct_handler); - m_input->ReadV(*direct_handler, &chunkVec[0], chunkVec.size()); + m_io->GetInput()->ReadV(*direct_handler, &chunkVec[0], chunkVec.size()); } }