diff --git a/src/XrdFileCache/XrdFileCache.cc b/src/XrdFileCache/XrdFileCache.cc index 8023cf9a8f7..7d79da364c3 100644 --- a/src/XrdFileCache/XrdFileCache.cc +++ b/src/XrdFileCache/XrdFileCache.cc @@ -176,23 +176,43 @@ Cache::Cache(XrdSysLogger *logger) : XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options) { + const char* tpfx = "Cache::Attach() "; + if (Cache::GetInstance().Decide(io)) { - TRACE(Info, "Cache::Attach() " << io->Path()); - IO* cio; + TRACE(Info, tpfx << io->Path()); + + IO *cio; + if (Cache::GetInstance().RefConfiguration().m_hdfsmode) + { cio = new IOFileBlock(io, m_stats, *this); + } else - cio = new IOEntireFile(io, m_stats, *this); + { + // TODO if overloaded, redirect !!! + + IOEntireFile *ioef = new IOEntireFile(io, m_stats, *this); - TRACE_PC(Debug, const char* loc = io->Location(), - "Cache::Attach() " << io->Path() << " location: " << + if ( ! ioef->HasFile()) + { + delete ioef; + // TODO redirect instead !!! + TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path()); + return io; + } + + cio = ioef; + } + + TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " << ((loc && loc[0] != 0) ? loc : "")); + return cio; } else { - TRACE(Info, "Cache::Attach() decision decline " << io->Path()); + TRACE(Info, tpfx << "decision decline " << io->Path()); } return io; } @@ -335,8 +355,9 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f filesize = st.st_size; } - File* file = new File(path, off, filesize); + File *file = File::FileOpen(path, off, filesize); + if (file) { XrdSysCondVarHelper lock(&m_active_cond); diff --git a/src/XrdFileCache/XrdFileCacheConfiguration.cc b/src/XrdFileCache/XrdFileCacheConfiguration.cc index 1d1effe119f..e7ec9f07d04 100644 --- a/src/XrdFileCache/XrdFileCacheConfiguration.cc +++ b/src/XrdFileCache/XrdFileCacheConfiguration.cc @@ -296,9 +296,9 @@ bool Cache::Config(const char *config_filename, const char *parameters) if (m_configuration.m_RamAbsAvailable == 0) { m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024; - char buff2[1024]; - snprintf(buff2, sizeof(buff2), "RAM usage is not specified. Default value %s is used.", m_isClient ? "256m" : "1g"); - TRACE(Warning, buff2); + char buff[1024]; + snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g"); + m_log.Say("Config info: ", buff); } m_configuration.m_NRamBuffers = static_cast(m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize); @@ -354,7 +354,7 @@ bool Cache::Config(const char *config_filename, const char *parameters) loff += snprintf(buff + loff, sizeof(buff) - loff, "%s", unameBuff); } - m_log.Say( buff); + m_log.Say(buff); } m_log.Say("------ File Caching Proxy interface initialization ", retval ? "completed" : "failed"); diff --git a/src/XrdFileCache/XrdFileCacheFile.cc b/src/XrdFileCache/XrdFileCacheFile.cc index bf58905d670..19bef489002 100644 --- a/src/XrdFileCache/XrdFileCacheFile.cc +++ b/src/XrdFileCache/XrdFileCacheFile.cc @@ -70,7 +70,6 @@ File::File(const std::string& path, long long iOffset, long long iFileSize) : m_prefetchScore(1), m_detachTimeIsLogged(false) { - Open(); } File::~File() @@ -94,6 +93,17 @@ File::~File() TRACEF(Debug, "File::~File() ended, prefetch score = " << m_prefetchScore); } +File* File::FileOpen(const std::string &path, long long offset, long long fileSize) +{ + File *file = new File(path, offset, fileSize); + if ( ! file->Open()) + { + delete file; + file = 0; + } + return file; +} + //------------------------------------------------------------------------------ void File::BlockRemovedFromWriteQ(Block* b) @@ -127,7 +137,7 @@ bool File::ioActive(IO *io) if (mi != m_io_map.end()) { TRACEF(Info, "ioActive for io " << io << - ", active_prefetces " << mi->second.m_active_prefetches << + ", active_prefetches " << mi->second.m_active_prefetches << ", allow_prefetching " << mi->second.m_allow_prefetching << "; (block_map.size() = " << m_block_map.size() << ")."); @@ -273,18 +283,31 @@ void File::RemoveIO(IO *io) bool File::Open() { - TRACEF(Dump, "File::Open() open file for disk cache "); + TRACEF(Dump, "File::Open() open file for disk cache"); + + if (m_is_open) + { + TRACEF(Error, "File::Open() file is already opened."); + return true; + } const Configuration &conf = Cache::GetInstance().RefConfiguration(); XrdOss &myOss = * Cache::GetInstance().GetOss(); const char *myUser = conf.m_username.c_str(); XrdOucEnv myEnv; + struct stat data_stat, info_stat; + + std::string ifn = m_filename + Info::m_infoExtension; + + bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK); + bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK); // Create the data file itself. char size_str[32]; sprintf(size_str, "%lld", m_fileSize); myEnv.Put("oss.asize", size_str); myEnv.Put("oss.cgroup", conf.m_data_space.c_str()); + if (myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK) { TRACEF(Error, "File::Open() Create failed for data file " << m_filename << ERRNO_AND_ERRSTR); @@ -299,12 +322,7 @@ bool File::Open() return false; } - // Create the info file - std::string ifn = m_filename + Info::m_infoExtension; - - struct stat infoStat; - bool fileExisted = (myOss.Stat(ifn.c_str(), &infoStat) == XrdOssOK); - + // Create the info file. myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ... myEnv.Put("oss.cgroup", conf.m_meta_space.c_str()); if (myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK) @@ -323,11 +341,26 @@ bool File::Open() return false; } - if (fileExisted && m_cfi.Read(m_infoFile, ifn)) + bool initialize_info_file = true; + + if (info_existed && m_cfi.Read(m_infoFile, ifn)) { - TRACEF(Debug, "Read existing info file."); + TRACEF(Debug, "Open - reading existing info file. (data_existed=" << data_existed << + ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) << + ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")"); + + // Check if data file exists and is of reasonable size. + if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize()) + { + initialize_info_file = false; + } + else + { + TRACEF(Warning, "Open - basic sanity checks on data file failed, resetting info file."); + m_cfi.ResetAllAccessStats(); + } } - else + if (initialize_info_file) { m_cfi.SetBufferSize(conf.m_bufferSize); m_cfi.SetFileSize(m_fileSize); diff --git a/src/XrdFileCache/XrdFileCacheFile.hh b/src/XrdFileCache/XrdFileCacheFile.hh index 66be9e34e1d..1ad2b6943da 100644 --- a/src/XrdFileCache/XrdFileCacheFile.hh +++ b/src/XrdFileCache/XrdFileCacheFile.hh @@ -138,12 +138,19 @@ public: //------------------------------------------------------------------------ File(const std::string &path, long long offset, long long fileSize); + //------------------------------------------------------------------------ + //! Static constructor that also does Open. Returns null ptr if Open fails. + //------------------------------------------------------------------------ + static File* FileOpen(const std::string &path, long long offset, long long fileSize); + //------------------------------------------------------------------------ //! Destructor. //------------------------------------------------------------------------ ~File(); + //! Handle removal of a block from Cache's write queue. void BlockRemovedFromWriteQ(Block*); + //! Open file handle for data file and info file on local disk. bool Open(); diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc index 18be8e3ffec..4ceff195401 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.cc +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.cc @@ -37,7 +37,7 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache & m_file(0), m_localStat(0) { - XrdCl::URL url(GetInput()->Path()); + XrdCl::URL url(GetInput()->Path()); std::string fname = url.GetPath(); m_file = Cache::GetInstance().GetFile(fname, this); } @@ -55,14 +55,14 @@ IOEntireFile::~IOEntireFile() //______________________________________________________________________________ int IOEntireFile::Fstat(struct stat &sbuff) { - XrdCl::URL url(GetPath()); + XrdCl::URL url(GetPath()); std::string name = url.GetPath(); name += Info::m_infoExtension; int res = 0; if( ! m_localStat) { - res = initCachedStat(name.c_str()); + res = initCachedStat(name.c_str()); if (res) return res; } diff --git a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh index 4ce5fbbdc16..1e4894fcdf4 100644 --- a/src/XrdFileCache/XrdFileCacheIOEntireFile.hh +++ b/src/XrdFileCache/XrdFileCacheIOEntireFile.hh @@ -50,6 +50,11 @@ public: //------------------------------------------------------------------------ ~IOEntireFile(); + //------------------------------------------------------------------------ + //! Check if File was opened successfully. + //------------------------------------------------------------------------ + bool HasFile() const { return m_file != 0; } + //--------------------------------------------------------------------- //! Pass Read request to the corresponding File object. //! diff --git a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc index 9a764bda129..ba3f8aea78c 100644 --- a/src/XrdFileCache/XrdFileCacheIOFileBlock.cc +++ b/src/XrdFileCache/XrdFileCacheIOFileBlock.cc @@ -65,8 +65,11 @@ XrdOucCacheIO* IOFileBlock::Detach() XrdSysMutexHelper lock(&m_mutex); for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) { - it->second->RequestSyncOfDetachStats(); - m_cache.ReleaseFile(it->second, this); + if (it->second) + { + it->second->RequestSyncOfDetachStats(); + m_cache.ReleaseFile(it->second, this); + } } } XrdOucCacheIO *io = GetInput(); @@ -101,13 +104,15 @@ void IOFileBlock::CloseInfoFile() void IOFileBlock::GetBlockSizeFromPath() { const static std::string tag = "hdfsbsize="; + std::string path = GetInput()->Path(); - size_t pos1 = path.find(tag); - size_t t = tag.length(); - if ( pos1 != path.npos) + size_t pos1 = path.find(tag); + size_t t = tag.length(); + + if (pos1 != path.npos) { pos1 += t; - size_t pos2 = path.find("&", pos1 ); + size_t pos2 = path.find("&", pos1); if (pos2 != path.npos ) { std::string bs = path.substr(pos1, pos2 - pos1); @@ -125,6 +130,8 @@ void IOFileBlock::GetBlockSizeFromPath() //______________________________________________________________________________ File* IOFileBlock::newBlockFile(long long off, int blocksize) { + // NOTE: Can return 0 if opening of a local file fails! + XrdCl::URL url(GetInput()->Path()); std::string fname = url.GetPath(); @@ -132,7 +139,7 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize) ss << fname; char offExt[64]; // filename like ____ - sprintf(&offExt[0],"___%lld_%lld", m_blocksize, off ); + sprintf(&offExt[0], "___%lld_%lld", m_blocksize, off); ss << &offExt[0]; fname = ss.str(); @@ -241,11 +248,13 @@ int IOFileBlock::initLocalStat() bool IOFileBlock::ioActive() { XrdSysMutexHelper lock(&m_mutex); + bool active = false; + for (std::map::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) { // Need to initiate stop on all File / block objects. - if (it->second->ioActive(this)) + if (it->second && it->second->ioActive(this)) { active = true; } @@ -280,7 +289,7 @@ int IOFileBlock::Read(char *buff, long long off, int size) for (int blockIdx = idx_first; blockIdx <= idx_last; ++blockIdx) { // locate block - File* fb; + File *fb; m_mutex.Lock(); std::map::iterator it = m_blocks.find(blockIdx); if (it != m_blocks.end()) @@ -298,8 +307,9 @@ int IOFileBlock::Read(char *buff, long long off, int size) // TRACEIO(Dump, "IOFileBlock::Read() last block, change output file size to " << pbs); } + // Note: File* can be 0 and stored as 0 if local open fails! fb = newBlockFile(blockIdx*m_blocksize, pbs); - m_blocks.insert(std::pair(blockIdx, (File*) fb)); + m_blocks.insert(std::make_pair(blockIdx, fb)); } m_mutex.UnLock(); @@ -309,12 +319,12 @@ int IOFileBlock::Read(char *buff, long long off, int size) { if (blockIdx == idx_first) { - readBlockSize = (blockIdx + 1) *m_blocksize - off0; + readBlockSize = (blockIdx + 1) * m_blocksize - off0; TRACEIO(Dump, "Read partially till the end of the block"); } else if (blockIdx == idx_last) { - readBlockSize = (off0+size) - blockIdx*m_blocksize; + readBlockSize = (off0 + size) - blockIdx * m_blocksize; TRACEIO(Dump, "Read partially till the end of the block %s"); } else @@ -325,7 +335,9 @@ int IOFileBlock::Read(char *buff, long long off, int size) TRACEIO(Dump, "IOFileBlock::Read() block[ " << blockIdx << "] read-block-size[" << readBlockSize << "], offset[" << readBlockSize << "] off = " << off ); - int retvalBlock = fb->Read(this, buff, off, readBlockSize); + int retvalBlock = (fb != 0) ? + fb->Read(this, buff, off, readBlockSize) : + GetInput()->Read(buff, off, readBlockSize); TRACEIO(Dump, "IOFileBlock::Read() Block read returned " << retvalBlock); if (retvalBlock == readBlockSize) diff --git a/src/XrdFileCache/XrdFileCacheInfo.cc b/src/XrdFileCache/XrdFileCacheInfo.cc index d2f4976292a..80346e53e1d 100644 --- a/src/XrdFileCache/XrdFileCacheInfo.cc +++ b/src/XrdFileCache/XrdFileCacheInfo.cc @@ -151,7 +151,7 @@ void Info::SetBufferSize(long long bs) void Info::SetFileSize(long long fs) { m_store.m_fileSize = fs; - ResizeBits((m_store.m_fileSize - 1)/m_store.m_bufferSize + 1); + ResizeBits((m_store.m_fileSize - 1) / m_store.m_bufferSize + 1); m_store.m_creationTime = time(0); } @@ -162,8 +162,8 @@ void Info::ResizeBits(int s) // drop buffer in case of failed/partial reads if (m_store.m_buff_synced) free(m_store.m_buff_synced); - if (m_buff_written) free(m_buff_written); - if (m_buff_prefetch) free(m_buff_prefetch); + if (m_buff_written) free(m_buff_written); + if (m_buff_prefetch) free(m_buff_prefetch); m_sizeInBits = s; m_buff_written = (unsigned char*) malloc(GetSizeInBytes()); @@ -176,6 +176,10 @@ void Info::ResizeBits(int s) m_buff_prefetch = (unsigned char*) malloc(GetSizeInBytes()); memset(m_buff_prefetch, 0, GetSizeInBytes()); } + else + { + m_buff_prefetch = 0; + } } //------------------------------------------------------------------------------ @@ -367,6 +371,12 @@ bool Info::Write(XrdOssDF* fp, const std::string &fname) //------------------------------------------------------------------------------ +void Info::ResetAllAccessStats() +{ + m_store.m_accessCnt = 0; + m_store.m_astats.clear(); +} + void Info::WriteIOStatAttach() { m_store.m_accessCnt++; diff --git a/src/XrdFileCache/XrdFileCacheInfo.hh b/src/XrdFileCache/XrdFileCacheInfo.hh index 95e674bd22d..6e786f9c0a6 100644 --- a/src/XrdFileCache/XrdFileCacheInfo.hh +++ b/src/XrdFileCache/XrdFileCacheInfo.hh @@ -142,6 +142,11 @@ public: //--------------------------------------------------------------------- void DisableDownloadStatus(); + //--------------------------------------------------------------------- + //! Reset IO Stats + //--------------------------------------------------------------------- + void ResetAllAccessStats(); + //--------------------------------------------------------------------- //! Write open time in the last entry of access statistics //--------------------------------------------------------------------- @@ -222,6 +227,16 @@ public: long long GetNDownloadedBytes() const; //--------------------------------------------------------------------- + //! Get number of the last downloaded block + //--------------------------------------------------------------------- + int GetLastDownloadedBlock() const; + + //--------------------------------------------------------------------- + //! Get expected data file size + //--------------------------------------------------------------------- + long long GetExpectedDataFileSize() const; + + //--------------------------------------------------------------------- //! Update complete status //--------------------------------------------------------------------- void UpdateDownloadCompleteStatus(); @@ -310,6 +325,23 @@ inline long long Info::GetNDownloadedBytes() const return m_store.m_bufferSize * GetNDownloadedBlocks(); } +inline int Info::GetLastDownloadedBlock() const +{ + for (int i = m_sizeInBits - 1; i >= 0; --i) + if (TestBit(i)) return i; + + return -1; +} + +inline long long Info::GetExpectedDataFileSize() const +{ + int last_block = GetLastDownloadedBlock(); + if (last_block == m_sizeInBits - 1) + return m_store.m_fileSize; + else + return (last_block + 1) * m_store.m_bufferSize; +} + inline int Info::GetSizeInBytes() const { if (m_sizeInBits)