diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 3da53ea8baa..95af7561ea6 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1010,16 +1010,10 @@ int Cache::Unlink(const char *curl) // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str()); - return UnlinkCommon(f_name, false); + return UnlinkFile(f_name, false); } - -int Cache::UnlinkUnlessOpen(const std::string& f_name) -{ - return UnlinkCommon(f_name, true); -} - -int Cache::UnlinkCommon(const std::string& f_name, bool fail_if_open) +int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open) { ActiveMap_i it; File *file = 0; diff --git a/src/XrdPfc/XrdPfc.hh b/src/XrdPfc/XrdPfc.hh index 757ad7dd8b7..d96abedf30f 100644 --- a/src/XrdPfc/XrdPfc.hh +++ b/src/XrdPfc/XrdPfc.hh @@ -342,9 +342,9 @@ public: void Purge(); //--------------------------------------------------------------------- - //! Remove file from cache unless it is currently open. + //! Remove cinfo and data files from cache. //--------------------------------------------------------------------- - int UnlinkUnlessOpen(const std::string& f_name); + int UnlinkFile(const std::string& f_name, bool fail_if_open); //--------------------------------------------------------------------- //! Add downloaded block in write queue. @@ -402,8 +402,6 @@ private: bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name); - int UnlinkCommon(const std::string& f_name, bool fail_if_open); - static Cache *m_instance; //!< this object XrdOucEnv *m_env; //!< environment passed in at creation diff --git a/src/XrdPfc/XrdPfcCommand.cc b/src/XrdPfc/XrdPfcCommand.cc index fc8f0dc1e1f..4b194bc80f7 100644 --- a/src/XrdPfc/XrdPfcCommand.cc +++ b/src/XrdPfc/XrdPfcCommand.cc @@ -234,7 +234,7 @@ void Cache::ExecuteCommandUrl(const std::string& command_url) Info myInfo(m_trace, false); myInfo.SetBufferSize(block_size); - myInfo.SetFileSize(file_size); + myInfo.SetFileSizeAndCreationTime(file_size); myInfo.SetAllBitsSynced(); for (int i = 0; i < at_count; ++i) @@ -312,7 +312,7 @@ void Cache::ExecuteCommandUrl(const std::string& command_url) TRACE(Debug, err_prefix << "file argument '" << f_name << "'."); - int ret = UnlinkCommon(f_name, true); + int ret = UnlinkFile(f_name, true); TRACE(Info, err_prefix << "returned with status " << ret); } diff --git a/src/XrdPfc/XrdPfcConfiguration.cc b/src/XrdPfc/XrdPfcConfiguration.cc index 93842110b42..94500dcc4e3 100644 --- a/src/XrdPfc/XrdPfcConfiguration.cc +++ b/src/XrdPfc/XrdPfcConfiguration.cc @@ -785,12 +785,11 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfigur return false; } } - else if ( part == "hdfsmode" || part == "filefragmentmode" ) + else if ( part == "hdfsmode" ) { - if (part == "filefragmentmode") - { - m_log.Emsg("Config", "pfc.filefragmentmode is deprecated, please use pfc.hdfsmode instead. Replacing the directive internally."); - } + m_log.Emsg("Config", "pfc.hdfsmode is currently unsupported."); + return false; + m_configuration.m_hdfsmode = true; const char* params = cwg.GetWord(); diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index b658b6497c7..4dcb324deaf 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -111,8 +111,9 @@ File* File::FileOpen(const std::string &path, long long offset, long long fileSi void File::initiate_emergency_shutdown() { - // Called from Cache::UnlinkCommon() when the file is currently open. - // CacheUnlink is also called on FSync error. + // Called from Cache::Unlink() when the file is currently open. + // Cache::Unlink is also called on FSync error and when wrong number of bytes + // is received from a remote read. // // From this point onward the file will not be written to, cinfo file will // not be updated, and all new read requests will return -ENOENT. @@ -477,7 +478,7 @@ bool File::Open() if (initialize_info_file) { m_cfi.SetBufferSize(conf.m_bufferSize); - m_cfi.SetFileSize(m_file_size); + m_cfi.SetFileSizeAndCreationTime(m_file_size); m_cfi.SetCkSumState(conf.get_cs_Chk()); m_cfi.Write(m_info_file, ifn.c_str()); m_info_file->Fsync(); @@ -1054,7 +1055,7 @@ void File::Sync() TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object"); // Unlink will also call this->initiate_emergency_shutdown() - Cache::GetInstance().Unlink(m_filename.c_str()); + Cache::GetInstance().UnlinkFile(m_filename, false); XrdSysCondVarHelper _lck(&m_state_cond); @@ -1179,12 +1180,20 @@ void File::ProcessBlockResponse(BlockResponseHandler* brh, int res) { static const char* tpfx = "ProcessBlockResponse "; - XrdSysCondVarHelper _lck(m_state_cond); - Block *b = brh->m_block; TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/BufferSize() << ", off=" << b->m_offset << ", res=" << res); + if (res >= 0 && res != b->get_size()) + { + // Incorrect number of bytes received, apparently size of the file on the remote + // is different than what the cache expects it to be. + TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object"); + Cache::GetInstance().UnlinkFile(m_filename, false); + } + + XrdSysCondVarHelper _lck(m_state_cond); + // Deregister block from IO's prefetch count, if needed. if (b->m_prefetch) { diff --git a/src/XrdPfc/XrdPfcIOFileBlock.cc b/src/XrdPfc/XrdPfcIOFileBlock.cc index 413f0ef5219..87893d71433 100644 --- a/src/XrdPfc/XrdPfcIOFileBlock.cc +++ b/src/XrdPfc/XrdPfcIOFileBlock.cc @@ -257,8 +257,8 @@ int IOFileBlock::initLocalStat() // The info file is used to get file size on defer open // don't initalize buffer, it does not hold useful information in this case m_info.SetBufferSize(m_cache.RefConfiguration().m_bufferSize); - m_info.DisableDownloadStatus(); - m_info.SetFileSize(tmpStat.st_size); + // m_info.DisableDownloadStatus(); -- this stopped working a while back. + m_info.SetFileSizeAndCreationTime(tmpStat.st_size); m_info.Write(m_info_file, path.c_str()); m_info_file->Fsync(); } diff --git a/src/XrdPfc/XrdPfcInfo.cc b/src/XrdPfc/XrdPfcInfo.cc index b8d4e14178b..492ec84c2ca 100644 --- a/src/XrdPfc/XrdPfcInfo.cc +++ b/src/XrdPfc/XrdPfcInfo.cc @@ -92,7 +92,7 @@ struct FpHelper } // Returns true on error - bool WriteRaw(void *buf, ssize_t size) + bool WriteRaw(const void *buf, ssize_t size) { ssize_t ret = f_fp->Write(buf, f_off, size); if (ret != size) @@ -105,7 +105,7 @@ struct FpHelper return false; } - template bool Write(T &loc) + template bool Write(const T &loc) { return WriteRaw(&loc, sizeof(T)); } @@ -124,17 +124,18 @@ const int Info::s_defaultVersion = 4; Info::Info(XrdSysTrace* trace, bool prefetchBuffer) : m_trace(trace), - m_hasPrefetchBuffer(prefetchBuffer), - m_buff_written(0), m_buff_prefetch(0), - m_sizeInBits(0), + m_buff_synced(0), m_buff_written(0), m_buff_prefetch(0), + m_version(0), + m_bitvecSizeInBits(0), m_complete(false), + m_hasPrefetchBuffer(prefetchBuffer), m_cksCalcMd5(0) {} Info::~Info() { - if (m_store.m_buff_synced) free(m_store.m_buff_synced); - if (m_buff_written) free(m_buff_written); + if (m_buff_synced) free(m_buff_synced); + if (m_buff_written) free(m_buff_written); if (m_buff_prefetch) free(m_buff_prefetch); delete m_cksCalcMd5; } @@ -144,14 +145,14 @@ Info::~Info() void Info::SetAllBitsSynced() { // The following should be: - // memset(m_store.m_buff_synced, 255, GetSizeInBytes()); + // memset(m_buff_synced, 255, GetBitvecSizeInBytes()); // but GCC produces an overzealous 'possible argument transpose warning' and // xrootd build uses warnings->errors escalation. // This workaround can be removed for gcc >= 5. // See also: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61294 - const int nb = GetSizeInBytes(); + const int nb = GetBitvecSizeInBytes(); for (int i = 0; i < nb; ++i) - m_store.m_buff_synced[i] = 255; + m_buff_synced[i] = 255; m_complete = true; } @@ -166,33 +167,34 @@ void Info::SetBufferSize(long long bs) //------------------------------------------------------------------------------s -void Info::SetFileSize(long long fs) +void Info::SetFileSizeAndCreationTime(long long fs) { m_store.m_file_size = fs; - ResizeBits((m_store.m_file_size - 1) / m_store.m_buffer_size + 1); + ResizeBits(); m_store.m_creationTime = time(0); } //------------------------------------------------------------------------------ -void Info::ResizeBits(int s) +void Info::ResizeBits() { // drop buffer in case of failed/partial reads - if (m_store.m_buff_synced) free(m_store.m_buff_synced); - if (m_buff_written) free(m_buff_written); - if (m_buff_prefetch) free(m_buff_prefetch); + if (m_buff_synced) free(m_buff_synced); + if (m_buff_written) free(m_buff_written); + if (m_buff_prefetch) free(m_buff_prefetch); - m_sizeInBits = s; - m_buff_written = (unsigned char*) malloc(GetSizeInBytes()); - m_store.m_buff_synced = (unsigned char*) malloc(GetSizeInBytes()); - memset(m_buff_written, 0, GetSizeInBytes()); - memset(m_store.m_buff_synced, 0, GetSizeInBytes()); + m_bitvecSizeInBits = (m_store.m_file_size - 1) / m_store.m_buffer_size + 1; + + m_buff_written = (unsigned char*) malloc(GetBitvecSizeInBytes()); + m_buff_synced = (unsigned char*) malloc(GetBitvecSizeInBytes()); + memset(m_buff_written, 0, GetBitvecSizeInBytes()); + memset(m_buff_synced, 0, GetBitvecSizeInBytes()); if (m_hasPrefetchBuffer) { - m_buff_prefetch = (unsigned char*) malloc(GetSizeInBytes()); - memset(m_buff_prefetch, 0, GetSizeInBytes()); + m_buff_prefetch = (unsigned char*) malloc(GetBitvecSizeInBytes()); + memset(m_buff_prefetch, 0, GetBitvecSizeInBytes()); } else { @@ -202,14 +204,6 @@ void Info::ResizeBits(int s) //------------------------------------------------------------------------------ -void Info::DisableDownloadStatus() -{ - // use version sign to skip download status - m_store.m_version = -m_store.m_version; -} - -//------------------------------------------------------------------------------ - void Info::ResetCkSumCache() { if (IsCkSumCache()) @@ -234,20 +228,25 @@ void Info::ResetCkSumNet() // Write / Read cinfo file //------------------------------------------------------------------------------ -uint32_t Info::GetCksum() +uint32_t Info::CalcCksumStore() { - uint32_t cks = crc32c(0, &m_store, 24); - return crc32c(cks, m_store.m_buff_synced, GetSizeInBytes()); + return crc32c(0, &m_store, sizeof(Store)); } -void Info::GetCksumMd5(unsigned char* buff, char* digest) +uint32_t Info::CalcCksumSyncedAndAStats() +{ + uint32_t cks = crc32c(0, m_buff_synced, GetBitvecSizeInBytes()); + return crc32c(cks, m_astats.data(), m_astats.size() * sizeof(AStat)); +} + +void Info::CalcCksumMd5(unsigned char* buff, char* digest) { if (m_cksCalcMd5) m_cksCalcMd5->Init(); else m_cksCalcMd5 = new XrdCksCalcmd5(); - m_cksCalcMd5->Update((const char*)buff, GetSizeInBytes()); + m_cksCalcMd5->Update((const char*)buff, GetBitvecSizeInBytes()); memcpy(digest, m_cksCalcMd5->Final(), 16); } @@ -270,30 +269,21 @@ const char* Info::GetCkSumStateAsText() const bool Info::Write(XrdOssDF* fp, const char *dname, const char *fname) { - TraceHeader trace_pfx(":Write()", dname, fname); + TraceHeader trace_pfx("Write()", dname, fname); - if (m_store.m_astats.size() > s_maxNumAccess) CompactifyAccessRecords(); + if (m_astats.size() > s_maxNumAccess) CompactifyAccessRecords(); + m_store.m_astatSize = (int32_t) m_astats.size(); FpHelper w(fp, 0, m_trace, m_traceID, trace_pfx); - m_store.m_version = s_defaultVersion; - if (w.Write(m_store.m_version)) return false; - if (w.Write(m_store.m_status._raw_)) return false; - if (w.Write(m_store.m_buffer_size)) return false; - if (w.Write(m_store.m_file_size)) return false; - - if (w.WriteRaw(m_store.m_buff_synced, GetSizeInBytes())) return false; - - m_store.m_cksum = GetCksum(); - if (w.Write(m_store.m_cksum)) return false; - - if (w.Write(m_store.m_creationTime)) return false; - if (w.Write(m_store.m_noCkSumTime)) return false; - - if (w.Write(m_store.m_accessCnt)) return false; - for (std::vector::iterator it = m_store.m_astats.begin(); it != m_store.m_astats.end(); ++it) + if (w.Write(s_defaultVersion) || + w.Write(m_store) || + w.Write(CalcCksumStore()) || + w.WriteRaw(m_buff_synced, GetBitvecSizeInBytes()) || + w.WriteRaw(m_astats.data(), m_store.m_astatSize * sizeof(AStat)) || + w.Write(CalcCksumSyncedAndAStats())) { - if (w.Write(*it)) return false; + return false; } return true; @@ -310,67 +300,59 @@ bool Info::Read(XrdOssDF *fp, const char *dname, const char *fname) // Does not need lock, called only in File::Open before File::Run() starts. // XXXX Wait, how about Purge, and LocalFilePath, Stat? - TraceHeader trace_pfx(":Read()", dname, fname); + TraceHeader trace_pfx("Read()", dname, fname); FpHelper r(fp, 0, m_trace, m_traceID, trace_pfx); - if (r.Read(m_store.m_version)) return false; + if (r.Read(m_version)) return false; - if (m_store.m_version != s_defaultVersion) + if (m_version != s_defaultVersion) { - if (abs(m_store.m_version) == 1) + if (m_version == 2) { - return ReadV1(fp, dname, fname); + return ReadV2(fp, r.f_off, dname, fname); } - else if (m_store.m_version == 2) + else if (m_version == 3) { - return ReadV2(fp, dname, fname); - } - else if (m_store.m_version == 3) - { - return ReadV3(fp, dname, fname); + return ReadV3(fp, r.f_off, dname, fname); } else { - TRACE(Warning, trace_pfx << "File version " << m_store.m_version << " not supported."); + TRACE(Warning, trace_pfx << "File version " << m_version << " not supported."); return false; } } - if (r.Read(m_store.m_status._raw_)) return false; - if (r.Read(m_store.m_buffer_size)) return false; - long long fs; - if (r.Read(fs)) return false; - SetFileSize(fs); + uint32_t cksum; - if (r.ReadRaw(m_store.m_buff_synced, GetSizeInBytes())) return false; - memcpy(m_buff_written, m_store.m_buff_synced, GetSizeInBytes()); + if (r.Read(m_store) || r.Read(cksum)) return false; - if (r.Read(m_store.m_cksum)) return false; - if (GetCksum() != m_store.m_cksum) + if (cksum != CalcCksumStore()) { - TRACE(Error, trace_pfx << "Buffer cksum and saved cksum don't match \n"); + TRACE(Error, trace_pfx << "Checksum Store mismatch."); return false; } - // cache complete status - m_complete = ! IsAnythingEmptyInRng(0, m_sizeInBits); + ResizeBits(); + m_astats.resize(m_store.m_astatSize); - // read creation and no-cksum times - if (r.Read(m_store.m_creationTime)) return false; - if (r.Read(m_store.m_noCkSumTime)) return false; - - // get number of accessess - if (r.Read(m_store.m_accessCnt, false)) m_store.m_accessCnt = 0; // was: return false; + if (r.ReadRaw(m_buff_synced, GetBitvecSizeInBytes()) || + r.ReadRaw(m_astats.data(), m_store.m_astatSize * sizeof(AStat)) || + r.Read(cksum)) + { + return false; + } - // read access statistics - m_store.m_astats.reserve(std::min(m_store.m_accessCnt, s_maxNumAccess)); - AStat as; - while ( ! r.Read(as, false)) + if (cksum != CalcCksumSyncedAndAStats()) { - m_store.m_astats.emplace_back(as); + TRACE(Error, trace_pfx << "Checksum Synced or AStats mismatch."); + return false; } + memcpy(m_buff_written, m_buff_synced, GetBitvecSizeInBytes()); + + m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); + return true; } @@ -381,7 +363,8 @@ bool Info::Read(XrdOssDF *fp, const char *dname, const char *fname) void Info::ResetAllAccessStats() { m_store.m_accessCnt = 0; - m_store.m_astats.clear(); + m_store.m_astatSize = 0; + m_astats.clear(); } void Info::AStat::MergeWith(const Info::AStat &b) @@ -401,7 +384,7 @@ void Info::CompactifyAccessRecords() { time_t now = time(0); - std::vector &v = m_store.m_astats; + std::vector &v = m_astats; for (int i = 0; i < (int) v.size() - 1; ++i) { @@ -444,21 +427,21 @@ void Info::WriteIOStatAttach() AStat as; as.AttachTime = time(0); - m_store.m_astats.push_back(as); + m_astats.push_back(as); } void Info::WriteIOStat(Stats& s) { - m_store.m_astats.back().NumIos = s.m_NumIos; - m_store.m_astats.back().Duration = s.m_Duration; - m_store.m_astats.back().BytesHit = s.m_BytesHit; - m_store.m_astats.back().BytesMissed = s.m_BytesMissed; - m_store.m_astats.back().BytesBypassed = s.m_BytesBypassed; + m_astats.back().NumIos = s.m_NumIos; + m_astats.back().Duration = s.m_Duration; + m_astats.back().BytesHit = s.m_BytesHit; + m_astats.back().BytesMissed = s.m_BytesMissed; + m_astats.back().BytesBypassed = s.m_BytesBypassed; } void Info::WriteIOStatDetach(Stats& s) { - m_store.m_astats.back().DetachTime = time(0); + m_astats.back().DetachTime = time(0); WriteIOStat(s); } @@ -470,7 +453,7 @@ void Info::WriteIOStatSingle(long long bytes_disk) as.AttachTime = as.DetachTime = time(0); as.NumIos = 1; as.BytesHit = bytes_disk; - m_store.m_astats.push_back(as); + m_astats.push_back(as); } void Info::WriteIOStatSingle(long long bytes_disk, time_t att, time_t dtc) @@ -483,20 +466,20 @@ void Info::WriteIOStatSingle(long long bytes_disk, time_t att, time_t dtc) as.NumIos = 1; as.Duration = dtc - att; as.BytesHit = bytes_disk; - m_store.m_astats.push_back(as); + m_astats.push_back(as); } //------------------------------------------------------------------------------ bool Info::GetLatestDetachTime(time_t& t) const { - if (m_store.m_astats.empty()) + if (m_astats.empty()) { t = m_store.m_creationTime; } else { - const AStat& ls = m_store.m_astats.back(); + const AStat& ls = m_astats.back(); if (ls.DetachTime == 0) t = ls.AttachTime + ls.Duration; @@ -509,41 +492,38 @@ bool Info::GetLatestDetachTime(time_t& t) const const Info::AStat* Info::GetLastAccessStats() const { - return m_store.m_astats.empty() ? 0 : & m_store.m_astats.back(); + return m_astats.empty() ? 0 : & m_astats.back(); } //============================================================================== // Support for reading of previous cinfo versions //============================================================================== -bool Info::ReadV3(XrdOssDF* fp, const char *dname, const char *fname) +bool Info::ReadV3(XrdOssDF* fp, off_t off, const char *dname, const char *fname) { - TraceHeader trace_pfx(":ReadV3()", dname, fname); + TraceHeader trace_pfx("ReadV3()", dname, fname); - FpHelper r(fp, 0, m_trace, m_traceID, trace_pfx); + FpHelper r(fp, off, m_trace, m_traceID, trace_pfx); - if (r.Read(m_store.m_version)) return false; if (r.Read(m_store.m_buffer_size)) return false; + if (r.Read(m_store.m_file_size)) return false; + ResizeBits(); - long long fs; - if (r.Read(fs)) return false; - SetFileSize(fs); - - if (r.ReadRaw(m_store.m_buff_synced, GetSizeInBytes())) return false; - memcpy(m_buff_written, m_store.m_buff_synced, GetSizeInBytes()); + if (r.ReadRaw(m_buff_synced, GetBitvecSizeInBytes())) return false; + memcpy(m_buff_written, m_buff_synced, GetBitvecSizeInBytes()); char fileCksum[16], tmpCksum[16]; if (r.ReadRaw(&fileCksum[0], 16)) return false; - GetCksumMd5(&m_store.m_buff_synced[0], &tmpCksum[0]); + CalcCksumMd5(&m_buff_synced[0], &tmpCksum[0]); if (memcmp(&fileCksum[0], &tmpCksum[0], 16)) { - TRACE(Error, trace_pfx << "buffer cksum and saved cksum don't match \n"); + TRACE(Error, trace_pfx << "buffer cksum and saved cksum don't match."); return false; } // cache complete status - m_complete = ! IsAnythingEmptyInRng(0, m_sizeInBits); + m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); // read creation time if (r.Read(m_store.m_creationTime)) return false; @@ -552,12 +532,20 @@ bool Info::ReadV3(XrdOssDF* fp, const char *dname, const char *fname) if (r.Read(m_store.m_accessCnt, false)) m_store.m_accessCnt = 0; // was: return false; // read access statistics - m_store.m_astats.reserve(std::min(m_store.m_accessCnt, s_maxNumAccess)); + m_astats.reserve(std::min(m_store.m_accessCnt, s_maxNumAccess)); AStat as; while ( ! r.Read(as, false)) { + // Consistency check ... weird stuff seen at UCSD StashCache. + if (as.NumIos <= 0 || as.AttachTime < 3600*24*365 || + (as.DetachTime != 0 && (as.DetachTime < 3600*24*365 || as.DetachTime < as.AttachTime))) + { + TRACE(Warning, trace_pfx << "Corrupted access record, skipping."); + continue; + } + as.Reserved = 0; - m_store.m_astats.emplace_back(as); + m_astats.emplace_back(as); } // Comment for V4: m_store.m_noCkSumTime and m_store_mstatus.f_cksum_check @@ -566,7 +554,7 @@ bool Info::ReadV3(XrdOssDF* fp, const char *dname, const char *fname) return true; } -bool Info::ReadV2(XrdOssDF* fp, const char *dname, const char *fname) +bool Info::ReadV2(XrdOssDF* fp, off_t off, const char *dname, const char *fname) { struct AStatV2 { @@ -577,32 +565,29 @@ bool Info::ReadV2(XrdOssDF* fp, const char *dname, const char *fname) long long BytesBypassed; //! read remote client }; - TraceHeader trace_pfx(":ReadV2()", dname, fname); + TraceHeader trace_pfx("ReadV2()", dname, fname); - FpHelper r(fp, 0, m_trace, m_traceID, trace_pfx); + FpHelper r(fp, off, m_trace, m_traceID, trace_pfx); - if (r.Read(m_store.m_version)) return false; if (r.Read(m_store.m_buffer_size)) return false; + if (r.Read(m_store.m_file_size)) return false; + ResizeBits(); - long long fs; - if (r.Read(fs)) return false; - SetFileSize(fs); - - if (r.ReadRaw(m_store.m_buff_synced, GetSizeInBytes())) return false; - memcpy(m_buff_written, m_store.m_buff_synced, GetSizeInBytes()); + if (r.ReadRaw(m_buff_synced, GetBitvecSizeInBytes())) return false; + memcpy(m_buff_written, m_buff_synced, GetBitvecSizeInBytes()); char fileCksum[16], tmpCksum[16]; if (r.ReadRaw(&fileCksum[0], 16)) return false; - GetCksumMd5(&m_store.m_buff_synced[0], &tmpCksum[0]); + CalcCksumMd5(&m_buff_synced[0], &tmpCksum[0]); if (memcmp(&fileCksum[0], &tmpCksum[0], 16)) { - TRACE(Error, trace_pfx << "buffer cksum and saved cksum don't match \n"); + TRACE(Error, trace_pfx << "buffer cksum and saved cksum don't match."); return false; } // cache complete status - m_complete = ! IsAnythingEmptyInRng(0, m_sizeInBits); + m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); // read creation time if (r.Read(m_store.m_creationTime)) return false; @@ -611,7 +596,7 @@ bool Info::ReadV2(XrdOssDF* fp, const char *dname, const char *fname) if (r.Read(m_store.m_accessCnt, false)) m_store.m_accessCnt = 0; // was: return false; // read access statistics - m_store.m_astats.reserve(std::min(m_store.m_accessCnt, s_maxNumAccess)); + m_astats.reserve(std::min(m_store.m_accessCnt, s_maxNumAccess)); AStatV2 av2; while ( ! r.ReadRaw(&av2, sizeof(AStatV2), false)) { @@ -626,62 +611,15 @@ bool Info::ReadV2(XrdOssDF* fp, const char *dname, const char *fname) as.BytesMissed = av2.BytesMissed; as.BytesBypassed = av2.BytesBypassed; - m_store.m_astats.emplace_back(as); - } - - return true; -} - -//------------------------------------------------------------------------------ - -bool Info::ReadV1(XrdOssDF* fp, const char *dname, const char *fname) -{ - struct AStatV1 - { - time_t DetachTime; //! close time - long long BytesHit; //! read from disk - long long BytesMissed; //! read from ram - long long BytesBypassed; //! read remote client - }; - - TraceHeader trace_pfx(":ReadV1()", dname, fname); - - FpHelper r(fp, 0, m_trace, m_traceID, trace_pfx); - - if (r.Read(m_store.m_version)) return false; - if (r.Read(m_store.m_buffer_size)) return false; - - long long fs; - if (r.Read(fs)) return false; - SetFileSize(fs); - - if (r.ReadRaw(m_store.m_buff_synced, GetSizeInBytes())) return false; - memcpy(m_buff_written, m_store.m_buff_synced, GetSizeInBytes()); - - m_complete = ! IsAnythingEmptyInRng(0, m_sizeInBits); - if (r.ReadRaw(&m_store.m_accessCnt, sizeof(int), false)) m_store.m_accessCnt = 0; // was: return false; - - m_store.m_astats.reserve(std::min(m_store.m_accessCnt, s_maxNumAccess)); - AStatV1 av1; - while ( ! r.ReadRaw(&av1, sizeof(AStatV1), false)) - { - AStat as; - as.AttachTime = av1.DetachTime; - as.DetachTime = av1.DetachTime; - as.NumIos = 1; - as.Duration = 0; - as.NumMerged = 0; - as.Reserved = 0; - as.BytesHit = av1.BytesHit; - as.BytesMissed = av1.BytesMissed; - as.BytesBypassed = av1.BytesBypassed; - - m_store.m_astats.emplace_back(as); - } + // Consistency check ... weird stuff seen at UCSD StashCache. + if (as.AttachTime < 3600*24*365 || + (as.DetachTime != 0 && (as.DetachTime < 3600*24*365 || as.DetachTime < as.AttachTime))) + { + TRACE(Warning, trace_pfx << "Corrupted access record, skipping."); + continue; + } - if ( ! m_store.m_astats.empty()) - { - m_store.m_creationTime = m_store.m_astats.front().AttachTime; + m_astats.emplace_back(as); } return true; diff --git a/src/XrdPfc/XrdPfcInfo.hh b/src/XrdPfc/XrdPfcInfo.hh index 99bd7d0e7d1..5f2b7e75359 100644 --- a/src/XrdPfc/XrdPfcInfo.hh +++ b/src/XrdPfc/XrdPfcInfo.hh @@ -79,20 +79,17 @@ public: struct Store { - int m_version; //!< info file version - Status m_status; //!< status information long long m_buffer_size; //!< buffer / block size long long m_file_size; //!< size of file in bytes - unsigned char *m_buff_synced; //!< disk written state vector - uint32_t m_cksum; //!< cksum of downloaded information time_t m_creationTime; //!< time the info file was created time_t m_noCkSumTime; //!< time when first non-cksummed block was detected size_t m_accessCnt; //!< total access count for the file - std::vector m_astats; //!< access records + Status m_status; //!< status information + int m_astatSize; //!< size of AStat vector Store () : - m_version(1), m_buffer_size(-1), m_file_size(0), m_buff_synced(0), - m_creationTime(0), m_noCkSumTime(0), m_accessCnt(0) + m_buffer_size(0), m_file_size(0), m_creationTime(0), m_noCkSumTime(0), + m_accessCnt(0), m_astatSize(0) {} }; @@ -139,13 +136,12 @@ public: void SetBufferSize(long long); - void SetFileSize(long long); + void SetFileSizeAndCreationTime(long long); //--------------------------------------------------------------------- - //! \brief Reserve buffer for file_size / buffer_size bytes - //! @param n number of file blocks + //! \brief Reserve bit vectors for file_size / buffer_size bytes. //--------------------------------------------------------------------- - void ResizeBits(int n); + void ResizeBits(); //--------------------------------------------------------------------- //! \brief Read content of cinfo file into this object @@ -170,11 +166,6 @@ public: //--------------------------------------------------------------------- void CompactifyAccessRecords(); - //--------------------------------------------------------------------- - //! Disable allocating, writing, and reading of download status - //--------------------------------------------------------------------- - void DisableDownloadStatus(); - //--------------------------------------------------------------------- //! Reset IO Stats //--------------------------------------------------------------------- @@ -213,7 +204,7 @@ public: //--------------------------------------------------------------------- //! Get size of download-state bit-vector in bytes. //--------------------------------------------------------------------- - int GetSizeInBytes() const; + int GetBitvecSizeInBytes() const; //--------------------------------------------------------------------- //! Get number of blocks represented in download-state bit-vector. @@ -278,12 +269,13 @@ public: //--------------------------------------------------------------------- //! Get version //--------------------------------------------------------------------- - int GetVersion() { return m_store.m_version; } + int GetVersion() { return m_version; } //--------------------------------------------------------------------- //! Get stored data //--------------------------------------------------------------------- - const Store& RefStoredData() const { return m_store; } + const Store& RefStoredData() const { return m_store; } + const std::vector& RefAStats() const { return m_astats; } //--------------------------------------------------------------------- //! Get file size @@ -293,8 +285,9 @@ public: //--------------------------------------------------------------------- //! Get cksum, MD5 is for backward compatibility with V2 and V3. //--------------------------------------------------------------------- - uint32_t GetCksum(); - void GetCksumMd5(unsigned char* buff, char* digest); + uint32_t CalcCksumStore(); + uint32_t CalcCksumSyncedAndAStats(); + void CalcCksumMd5(unsigned char* buff, char* digest); CkSumCheck_e GetCkSumState() const { return (CkSumCheck_e) m_store.m_status.f_cksum_check; } const char* GetCkSumStateAsText() const; @@ -329,20 +322,22 @@ protected: XrdSysTrace* m_trace; Store m_store; - bool m_hasPrefetchBuffer; //!< constains current prefetch score + unsigned char *m_buff_synced; //!< disk written state vector unsigned char *m_buff_written; //!< download state vector unsigned char *m_buff_prefetch; //!< prefetch statistics + std::vector m_astats; //!< access records - int m_sizeInBits; //!< cached + int m_version; + int m_bitvecSizeInBits; //!< cached bool m_complete; //!< cached + bool m_hasPrefetchBuffer; //!< constains current prefetch score private: inline unsigned char cfiBIT(int n) const { return 1 << n; } // Reading functions for older cinfo file formats - bool ReadV1(XrdOssDF* fp, const char *dname, const char *fname); - bool ReadV2(XrdOssDF* fp, const char *dname, const char *fname); - bool ReadV3(XrdOssDF* fp, const char *dname, const char *fname); + bool ReadV2(XrdOssDF* fp, off_t off, const char *dname, const char *fname); + bool ReadV3(XrdOssDF* fp, off_t off, const char *dname, const char *fname); XrdCksCalc* m_cksCalcMd5; }; @@ -352,7 +347,7 @@ private: inline bool Info::TestBitWritten(int i) const { const int cn = i/8; - assert(cn < GetSizeInBytes()); + assert(cn < GetBitvecSizeInBytes()); const int off = i - cn*8; return (m_buff_written[cn] & cfiBIT(off)) != 0; @@ -361,7 +356,7 @@ inline bool Info::TestBitWritten(int i) const inline void Info::SetBitWritten(int i) { const int cn = i/8; - assert(cn < GetSizeInBytes()); + assert(cn < GetBitvecSizeInBytes()); const int off = i - cn*8; m_buff_written[cn] |= cfiBIT(off); @@ -372,7 +367,7 @@ inline void Info::SetBitPrefetch(int i) if (!m_buff_prefetch) return; const int cn = i/8; - assert(cn < GetSizeInBytes()); + assert(cn < GetBitvecSizeInBytes()); const int off = i - cn*8; m_buff_prefetch[cn] |= cfiBIT(off); @@ -383,7 +378,7 @@ inline bool Info::TestBitPrefetch(int i) const if (!m_buff_prefetch) return false; const int cn = i/8; - assert(cn < GetSizeInBytes()); + assert(cn < GetBitvecSizeInBytes()); const int off = i - cn*8; return (m_buff_prefetch[cn] & cfiBIT(off)) != 0; @@ -392,10 +387,10 @@ inline bool Info::TestBitPrefetch(int i) const inline void Info::SetBitSynced(int i) { const int cn = i/8; - assert(cn < GetSizeInBytes()); + assert(cn < GetBitvecSizeInBytes()); const int off = i - cn*8; - m_store.m_buff_synced[cn] |= cfiBIT(off); + m_buff_synced[cn] |= cfiBIT(off); } //------------------------------------------------------------------------------ @@ -403,7 +398,7 @@ inline void Info::SetBitSynced(int i) inline int Info::GetNDownloadedBlocks() const { int cntd = 0; - for (int i = 0; i < m_sizeInBits; ++i) + for (int i = 0; i < m_bitvecSizeInBits; ++i) if (TestBitWritten(i)) cntd++; return cntd; @@ -416,7 +411,7 @@ inline long long Info::GetNDownloadedBytes() const inline int Info::GetLastDownloadedBlock() const { - for (int i = m_sizeInBits - 1; i >= 0; --i) + for (int i = m_bitvecSizeInBits - 1; i >= 0; --i) if (TestBitWritten(i)) return i; return -1; @@ -425,23 +420,23 @@ inline int Info::GetLastDownloadedBlock() const inline long long Info::GetExpectedDataFileSize() const { int last_block = GetLastDownloadedBlock(); - if (last_block == m_sizeInBits - 1) + if (last_block == m_bitvecSizeInBits - 1) return m_store.m_file_size; else return (last_block + 1) * m_store.m_buffer_size; } -inline int Info::GetSizeInBytes() const +inline int Info::GetBitvecSizeInBytes() const { - if (m_sizeInBits) - return ((m_sizeInBits - 1)/8 + 1); + if (m_bitvecSizeInBits) + return ((m_bitvecSizeInBits - 1)/8 + 1); else return 0; } inline int Info::GetNBlocks() const { - return m_sizeInBits; + return m_bitvecSizeInBits; } inline long long Info::GetFileSize() const @@ -457,7 +452,7 @@ inline bool Info::IsComplete() const inline bool Info::IsAnythingEmptyInRng(int firstIdx, int lastIdx) const { // TODO rewrite to use full byte comparisons outside of edges ? - // Also, it is always called with fisrtsdx = 0, lastIdx = m_sizeInBits. + // Also, it seems to be always called with firstIdx = 0, lastIdx = m_bitvecSizeInBits. for (int i = firstIdx; i < lastIdx; ++i) if (! TestBitWritten(i)) return true; @@ -466,7 +461,7 @@ inline bool Info::IsAnythingEmptyInRng(int firstIdx, int lastIdx) const inline void Info::UpdateDownloadCompleteStatus() { - m_complete = ! IsAnythingEmptyInRng(0, m_sizeInBits); + m_complete = ! IsAnythingEmptyInRng(0, m_bitvecSizeInBits); } inline long long Info::GetBufferSize() const diff --git a/src/XrdPfc/XrdPfcPrint.cc b/src/XrdPfc/XrdPfcPrint.cc index 4f35fc075a7..d251cdf43d9 100644 --- a/src/XrdPfc/XrdPfcPrint.cc +++ b/src/XrdPfc/XrdPfcPrint.cc @@ -63,7 +63,7 @@ void Print::printFile(const std::string& path) XrdOssDF* fh = m_oss->newFile(m_ossUser); fh->Open((path).c_str(),O_RDONLY, 0600, m_env); - XrdSysTrace tr(""); tr.What = 2; + XrdSysTrace tr("XrdPfcPrint"); tr.What = 2; Info cfi(&tr); if ( ! cfi.Read(fh, path.c_str())) @@ -117,7 +117,8 @@ void Print::printFile(const std::string& path) "Record", "Attach", "Detach", "Duration", "N_ios", "N_mrg", "B_hit[kB]", "B_miss[kB]", "B_bypass[kB]"); int idx = 1; - for (std::vector::const_iterator it = store.m_astats.begin(); it != store.m_astats.end(); ++it) + const std::vector &astats = cfi.RefAStats(); + for (std::vector::const_iterator it = astats.begin(); it != astats.end(); ++it) { const int MM = 128; char s[MM]; diff --git a/src/XrdPfc/XrdPfcPurge.cc b/src/XrdPfc/XrdPfcPurge.cc index b7b04a21ec7..1b286885ae8 100644 --- a/src/XrdPfc/XrdPfcPurge.cc +++ b/src/XrdPfc/XrdPfcPurge.cc @@ -446,7 +446,7 @@ class FPurgeState void TraverseNamespace(XrdOssDF *iOssDF) { - static const char *trc_pfx = "FPurgeState::TraverseNamespace "; + static const char *trc_pfx = "FPurgeState::TraverseNamespace "; char fname[256]; struct stat fstat; @@ -456,8 +456,19 @@ class FPurgeState iOssDF->StatRet(&fstat); - while (iOssDF->Readdir(fname, 256) >= 0) + while (true) { + int rc = iOssDF->Readdir(fname, 256); + + if (rc == -ENOENT) { + TRACE_PURGE(" Skipping ENOENT dir entry [" << fname << "]."); + continue; + } + if (rc != XrdOssOK) { + TRACE(Error, trc_pfx << "Readdir error at " << m_current_path << ", err " << XrdSysE2T(-rc) << "."); + break; + } + TRACE_PURGE(" Readdir [" << fname << "]"); if (fname[0] == 0) { @@ -476,9 +487,9 @@ class FPurgeState { if (m_oss_at.Opendir(*iOssDF, fname, env, dfh) == XrdOssOK) { - cd_down(fname); + cd_down(fname); TRACE_PURGE(" cd_down -> [" << m_current_path << "]."); TraverseNamespace(dfh); - cd_up(); + cd_up(); TRACE_PURGE(" cd_up -> [" << m_current_path << "]."); } else TRACE(Warning, trc_pfx << "could not opendir [" << m_current_path << fname << "], " << XrdSysE2T(errno));