From 1e20fbd297249bf6a215678a8cdbe867d146f792 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 11 Mar 2023 10:53:54 -0600 Subject: [PATCH 1/8] Add helper class for parsing cache-control header The HTTP `cache-control` header has a well-defined set of potential values. This helper class will allow us to central its parsing. --- src/XrdOuc/XrdOucCacheDirective.cc | 85 ++++++++++++++++++++++++++++++ src/XrdOuc/XrdOucCacheDirective.hh | 82 ++++++++++++++++++++++++++++ src/XrdUtils.cmake | 1 + 3 files changed, 168 insertions(+) create mode 100644 src/XrdOuc/XrdOucCacheDirective.cc create mode 100644 src/XrdOuc/XrdOucCacheDirective.hh diff --git a/src/XrdOuc/XrdOucCacheDirective.cc b/src/XrdOuc/XrdOucCacheDirective.cc new file mode 100644 index 00000000000..b203d77c7b5 --- /dev/null +++ b/src/XrdOuc/XrdOucCacheDirective.cc @@ -0,0 +1,85 @@ +/******************************************************************************/ +/* */ +/* X r d O u c C a c h e D i r e c t i v e . c c */ +/* */ +/* (c) 2023 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include "XrdOuc/XrdOucCacheDirective.hh" +#include "XrdOuc/XrdOucString.hh" + +#include + +XrdOucCacheDirective::XrdOucCacheDirective(const std::string &header) +{ + XrdOucString header_ouc(header.c_str()); + int from = 0; + XrdOucString directive; + while ((from = header_ouc.tokenize(directive, from, ',')) != -1) { + + // Trim out whitespace, make lowercase + int begin = 0; + while (begin < directive.length() && !isgraph(directive[begin])) {begin++;} + if (begin) directive.erasefromstart(begin); + if (directive.length()) { + int endtrim = 0; + while (endtrim < directive.length() && !isgraph(directive[directive.length() - endtrim - 1])) {endtrim++;} + if (endtrim) directive.erasefromend(endtrim); + } + if (directive.length() == 0) {continue;} + directive.lower(0); + + int pos = directive.find('='); + // No known cache directive command is larger than 19 characters; + // use this fact so we can have a statically sized buffer. + if (pos > 19 || directive.length() > 19) { + m_unknown.push_back(directive.c_str()); + continue; + } + char command[20]; + command[19] = '\0'; + if (pos == STR_NPOS) { + strncpy(command, directive.c_str(), 19); + } else { + memcpy(command, directive.c_str(), pos); + command[pos] = '\0'; + } + if (!strcmp(command, "no-cache")) { + m_no_cache = true; + } else if (!strcmp(command, "no-store")) { + m_no_store = true; + } else if (!strcmp(command, "only-if-cached")) { + m_only_if_cached = true; + } else if (!strcmp(command, "max-age")) { + std::string value(directive.c_str() + pos + 1); + try { + m_max_age = std::stoi(value); + } catch (...) { + m_unknown.push_back(directive.c_str()); + } + } else { + m_unknown.push_back(directive.c_str()); + } + } +} diff --git a/src/XrdOuc/XrdOucCacheDirective.hh b/src/XrdOuc/XrdOucCacheDirective.hh new file mode 100644 index 00000000000..d3f778f8c19 --- /dev/null +++ b/src/XrdOuc/XrdOucCacheDirective.hh @@ -0,0 +1,82 @@ +/******************************************************************************/ +/* */ +/* X r d O u c C a c h e D i r e c t i v e . h h */ +/* */ +/* (c) 2023 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#pragma once + +#include +#include + +/** + * Helper class for parsing the known cache headers. + * + * Purposely hews to the HTTP cache-control headers where possible. See + * + * for a detailed explanation of the different directives. + */ +class XrdOucCacheDirective { + +public: + /** + * Given a header value, parse out to a list of known cache directives + */ + XrdOucCacheDirective(const std::string &header); + + /** + * Returns a list of unknown directives provided. + */ + const std::vector UnknownDirectives() const; + + /** + * Returns true if the `no-cache` directive is present. + */ + bool NoCache() const {return m_no_cache;} + + /** + * Returns true if the `no-store` directive is present. + */ + bool NoStore() const {return m_no_store;} + + /** + * Returns true if the `only-if-cached` directive is present. + */ + bool OnlyIfCached() const {return m_only_if_cached;} + + /** + * Returns the value of the `max-age` directive; if the directive + * is not present, returns -1. + */ + int MaxAge() const {return m_max_age;} + +private: + bool m_no_cache{false}; + bool m_no_store{false}; + bool m_only_if_cached{false}; + int m_max_age{-1}; + + std::vector m_unknown; +}; diff --git a/src/XrdUtils.cmake b/src/XrdUtils.cmake index 2a482ab0375..243057f62cf 100644 --- a/src/XrdUtils.cmake +++ b/src/XrdUtils.cmake @@ -102,6 +102,7 @@ set ( XrdOucSources XrdOuc/XrdOucBuffer.cc XrdOuc/XrdOucBuffer.hh XrdOuc/XrdOucCache.cc XrdOuc/XrdOucCache.hh XrdOuc/XrdOucCacheStats.hh + XrdOuc/XrdOucCacheDirective.cc XrdOuc/XrdOucCacheDirective.hh XrdOuc/XrdOucCallBack.cc XrdOuc/XrdOucCallBack.hh XrdOuc/XrdOucChkPnt.hh XrdOuc/XrdOucCRC.cc XrdOuc/XrdOucCRC.hh From b0c6baab7c55357ff35c1251550b14665fb4d4c5 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 11 Mar 2023 10:57:48 -0600 Subject: [PATCH 2/8] Add initial `cache-control` support to PFC This provides the PFC with the ability to parse the `cache-control` CGI, using the XrdOucCacheDirective helper class, and understand the no-store and no-cache directives. *Note* this handles the simple cases only -- if the file in the cache was already opened by another client then we will serve from that file according to the original open rules. --- src/XrdPfc/XrdPfc.cc | 14 ++++++++++++++ src/XrdPfc/XrdPfcFile.cc | 10 +++++----- src/XrdPfc/XrdPfcFile.hh | 4 ++++ src/XrdPfc/XrdPfcIOFile.hh | 2 ++ 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index d6b7e261790..46d7c4d3cfe 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -24,6 +24,7 @@ #include "XrdCl/XrdClConstants.hh" #include "XrdCl/XrdClURL.hh" +#include "XrdOuc/XrdOucCacheDirective.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucUtils.hh" @@ -230,6 +231,19 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) return io; } + XrdCl::URL url(io->Path()); + auto params = url.GetParams(); + auto iter = params.find("cache-control"); + if (iter != params.end()) + { + XrdOucCacheDirective directive(iter->second); + if (directive.NoStore() || directive.NoCache()) + { + TRACE(Debug, tpfx << "Disabling set store"); + iof->SetStore(false); + } + } + cio = iof; } diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index c684ec27ce0..577cbc0878f 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -656,7 +656,7 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadR // Shortcut -- file is fully downloaded. - if (m_cfi.IsComplete()) + if (m_store && m_cfi.IsComplete()) { m_state_cond.UnLock(); int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize); @@ -685,7 +685,7 @@ int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh) // Shortcut -- file is fully downloaded. - if (m_cfi.IsComplete()) + if (m_store && m_cfi.IsComplete()) { m_state_cond.UnLock(); int ret = m_data_file->ReadV(const_cast(readV), readVnum); @@ -751,7 +751,7 @@ int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum, overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size); // In RAM or incoming? - if (bi != m_block_map.end()) + if (m_store && bi != m_block_map.end()) { inc_ref_count(bi->second); TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx); @@ -782,7 +782,7 @@ int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum, lbe = LB_other; } // On disk? - else if (m_cfi.TestBitWritten(offsetIdx(block_idx))) + else if (m_store && m_cfi.TestBitWritten(offsetIdx(block_idx))) { TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx); @@ -804,7 +804,7 @@ int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum, read_req = new ReadRequest(io, rh); // Is there room for one more RAM Block? - Block *b = PrepareBlockRequest(block_idx, io, read_req, false); + Block *b = m_store ? PrepareBlockRequest(block_idx, io, read_req, false) : nullptr; if (b) { TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx); diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index 0fcb7c0a021..40271bd1b1d 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -301,6 +301,8 @@ public: void initiate_emergency_shutdown(); bool is_in_emergency_shutdown() { return m_in_shutdown; } + void SetStore(bool val) {m_store = val;} + private: //! Constructor. File(const std::string &path, long long offset, long long fileSize); @@ -310,6 +312,8 @@ private: static const char *m_traceID; + bool m_store{true}; //!< indicates the file should cached + int m_ref_cnt; //!< number of references from IO or sync XrdOssDF *m_data_file; //!< file handle for data file on disk diff --git a/src/XrdPfc/XrdPfcIOFile.hh b/src/XrdPfc/XrdPfcIOFile.hh index 7e771dbbfb2..cb46b7370c8 100644 --- a/src/XrdPfc/XrdPfcIOFile.hh +++ b/src/XrdPfc/XrdPfcIOFile.hh @@ -77,6 +77,8 @@ public: long long FSize() override; + void SetStore(bool val) {if (m_file) m_file->SetStore(val);} + private: File *m_file; From e63f575b1f721da7a837433d28cbbd0790208c1a Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 11 Mar 2023 11:08:46 -0600 Subject: [PATCH 3/8] Add support for the cache-control header Fixes #1886 --- src/XrdHttp/XrdHttpProtocol.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/XrdHttp/XrdHttpProtocol.cc b/src/XrdHttp/XrdHttpProtocol.cc index 40c455656e5..bbcf842425e 100644 --- a/src/XrdHttp/XrdHttpProtocol.cc +++ b/src/XrdHttp/XrdHttpProtocol.cc @@ -1041,6 +1041,10 @@ int XrdHttpProtocol::Config(const char *ConfigFN, XrdOucEnv *myEnv) { return 1; } +// Some headers must always be converted to CGI key=value pairs +// + hdr2cgimap["Cache-Control"] = "cache-control"; + // Test if XrdEC is loaded if (getenv("XRDCL_EC")) usingEC = true; From b8f58b5489dd0388b26f9dba619c53b71039e1ba Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 11 Mar 2023 14:27:57 -0600 Subject: [PATCH 4/8] Implement the HTTP cache control `only if cached` behavior. With this, clients will receive the RFC standard 504 Gateway Timeout if the file is not already cached. This subtly changes the semantics of the creation time in the cinfo file to be when the first block of data is written as opposed to when the cinfo is created (as opening but not reading the file will create the cinfo file). --- src/XrdHttp/XrdHttpReq.cc | 16 ++++++++++++++++ src/XrdHttp/XrdHttpReq.hh | 2 ++ src/XrdPfc/XrdPfc.cc | 34 +++++++++++++++++++++------------- src/XrdPfc/XrdPfcFile.cc | 7 ++++++- src/XrdPfc/XrdPfcFile.hh | 2 ++ src/XrdPfc/XrdPfcIOFile.cc | 4 ++++ src/XrdPfc/XrdPfcIOFile.hh | 2 -- src/XrdPfc/XrdPfcInfo.cc | 5 +++-- src/XrdPfc/XrdPfcInfo.hh | 7 ++++++- 9 files changed, 60 insertions(+), 19 deletions(-) diff --git a/src/XrdHttp/XrdHttpReq.cc b/src/XrdHttp/XrdHttpReq.cc index 6732578cdfb..ff7300ccd5f 100644 --- a/src/XrdHttp/XrdHttpReq.cc +++ b/src/XrdHttp/XrdHttpReq.cc @@ -44,6 +44,7 @@ #include #include #include "XrdSys/XrdSysPlatform.hh" +#include "XrdOuc/XrdOucCacheDirective.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdHttpProtocol.hh" #include "Xrd/XrdLink.hh" @@ -317,6 +318,9 @@ int XrdHttpReq::parseLine(char *line, int len) { hdr2cgistr.append("="); hdr2cgistr.append(s); } + if (!strcasecmp(key, "Cache-Control")) { + m_cache_control.assign(val, line + len - val); + } } @@ -2320,6 +2324,17 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { } long object_age = time(NULL) - filemodtime; responseHeader += std::string("Age: ") + std::to_string(object_age < 0 ? 0 : object_age); + + + if (!m_cache_control.empty()) { + XrdOucCacheDirective directive(m_cache_control); + // To avoid some off-by-one errors, treat an object of age 1 as a cache miss. + if (directive.OnlyIfCached() && object_age <= 1) { + prot->SendSimpleResp(504, "Gateway Timeout", + responseHeader.empty() ? nullptr : responseHeader.c_str(), nullptr, 0, false); + return -1; + } + } } if (rwOps.size() == 0) { @@ -2979,6 +2994,7 @@ void XrdHttpReq::reset() { m_req_digest.clear(); m_resource_with_digest = ""; + m_cache_control.clear(); headerok = false; keepalive = true; length = 0; diff --git a/src/XrdHttp/XrdHttpReq.hh b/src/XrdHttp/XrdHttpReq.hh index f6dbdf9c5af..e12f30186f9 100644 --- a/src/XrdHttp/XrdHttpReq.hh +++ b/src/XrdHttp/XrdHttpReq.hh @@ -242,6 +242,8 @@ public: XrdOucString m_resource_with_digest; /// The computed digest for the HTTP response header. std::string m_digest_header; + /// The contents of the cache-control header for the request + std::string m_cache_control; /// Additional opaque info that may come from the hdr2cgi directive std::string hdr2cgistr; diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 46d7c4d3cfe..0e50521f385 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -231,19 +231,6 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options) return io; } - XrdCl::URL url(io->Path()); - auto params = url.GetParams(); - auto iter = params.find("cache-control"); - if (iter != params.end()) - { - XrdOucCacheDirective directive(iter->second); - if (directive.NoStore() || directive.NoCache()) - { - TRACE(Debug, tpfx << "Disabling set store"); - iof->SetStore(false); - } - } - cio = iof; } @@ -478,6 +465,27 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f file = File::FileOpen(path, off, filesize); } + if (file) + { + XrdCl::URL url(io->Path()); + auto params = url.GetParams(); + auto iter = params.find("cache-control"); + if (iter != params.end()) + { + XrdOucCacheDirective directive(iter->second); + if (directive.NoStore() || directive.NoCache()) + { + TRACE(Debug, "GetFile: Disabling set store"); + file->SetStore(false); + } + if (directive.OnlyIfCached()) + { + TRACE(Debug, "GetFile: Setting `only if cached` mode"); + file->SetOnlyIfCached(true); + } + } + } + { XrdSysCondVarHelper lock(&m_active_cond); diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index 577cbc0878f..f5f3eb5aa80 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -313,8 +313,10 @@ void File::AddIO(IO *io) insert_remote_location(loc); - if (m_prefetch_state == kStopped) + // In the case of "only if cached" mode, do not fire off a prefetch. + if (!m_only_if_cached && m_prefetch_state == kStopped) { + TRACEF(Debug, "AddIO() io = " << (void*)io << " enabling prefetch"); m_prefetch_state = kOn; cache()->RegisterPrefetchFile(this); } @@ -943,6 +945,9 @@ void File::WriteBlockToDisk(Block* b) long long size = b->get_size(); ssize_t retval; + if (m_cfi.GetCreationTime() == 0) + m_cfi.SetCreationTime(time(NULL)); + if (m_cfi.IsCkSumCache()) if (b->has_cksums()) retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0); diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index 40271bd1b1d..e62ff714e68 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -302,6 +302,7 @@ public: bool is_in_emergency_shutdown() { return m_in_shutdown; } void SetStore(bool val) {m_store = val;} + void SetOnlyIfCached(bool val) {m_only_if_cached = val;} private: //! Constructor. @@ -313,6 +314,7 @@ private: static const char *m_traceID; bool m_store{true}; //!< indicates the file should cached + bool m_only_if_cached{false}; //!< indicates the 'only if cached' directive is set; disables prefetch int m_ref_cnt; //!< number of references from IO or sync diff --git a/src/XrdPfc/XrdPfcIOFile.cc b/src/XrdPfc/XrdPfcIOFile.cc index d0b3ebef169..2a7ae8158de 100644 --- a/src/XrdPfc/XrdPfcIOFile.cc +++ b/src/XrdPfc/XrdPfcIOFile.cc @@ -98,6 +98,10 @@ int IOFile::initCachedStat() // We are arguably abusing the mtime to be the creation time of the file; then ctime becomes the // last time additional data was cached. tmpStat.st_mtime = info.GetCreationTime(); + // If no blocks have been written, assume it's newly created + if (!tmpStat.st_mtime) { + tmpStat.st_mtime = time(NULL); + } TRACEIO(Info, trace_pfx << "successfully read size " << tmpStat.st_size << " and creation time " << tmpStat.st_mtime << " from info file"); res = 0; } diff --git a/src/XrdPfc/XrdPfcIOFile.hh b/src/XrdPfc/XrdPfcIOFile.hh index cb46b7370c8..7e771dbbfb2 100644 --- a/src/XrdPfc/XrdPfcIOFile.hh +++ b/src/XrdPfc/XrdPfcIOFile.hh @@ -77,8 +77,6 @@ public: long long FSize() override; - void SetStore(bool val) {if (m_file) m_file->SetStore(val);} - private: File *m_file; diff --git a/src/XrdPfc/XrdPfcInfo.cc b/src/XrdPfc/XrdPfcInfo.cc index d063affd33d..82c2fbe55b6 100644 --- a/src/XrdPfc/XrdPfcInfo.cc +++ b/src/XrdPfc/XrdPfcInfo.cc @@ -166,7 +166,8 @@ void Info::SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs) m_store.m_buffer_size = bs; m_store.m_file_size = fs; ResizeBits(); - m_store.m_creationTime = time(0); + // Defer setting the creation time until we've had a single block write + m_store.m_creationTime = 0; } //------------------------------------------------------------------------------ @@ -662,4 +663,4 @@ void Info::TestCksumStuff() } } -#endif \ No newline at end of file +#endif diff --git a/src/XrdPfc/XrdPfcInfo.hh b/src/XrdPfc/XrdPfcInfo.hh index 36a5bc37ad0..f49a1afcc87 100644 --- a/src/XrdPfc/XrdPfcInfo.hh +++ b/src/XrdPfc/XrdPfcInfo.hh @@ -276,10 +276,15 @@ public: const std::vector& RefAStats() const { return m_astats; } //--------------------------------------------------------------------- - //! Get file size + //! Get file creation time //--------------------------------------------------------------------- time_t GetCreationTime() const { return m_store.m_creationTime; } + //--------------------------------------------------------------------- + //! Set the file creation time + // + void SetCreationTime(time_t new_time) { m_store.m_creationTime = new_time; } + //--------------------------------------------------------------------- //! Get cksum, MD5 is for backward compatibility with V2 and V3. //--------------------------------------------------------------------- From ef7527ab19d3b16ed1c015a6186cd54f6a4ee913 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 12 Mar 2023 14:38:52 -0500 Subject: [PATCH 5/8] Add support for `max-age` directive This allows the client to specify the maximum age of the cached data they are willing to accept. This currently only works for the first attach of the data. --- src/XrdPfc/XrdPfc.cc | 22 ++++++++++++++++++++++ src/XrdPfc/XrdPfcFile.hh | 2 ++ src/XrdPfc/XrdPfcIO.hh | 5 +++++ src/XrdPfc/XrdPfcIOFile.hh | 7 ++++++- 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 0e50521f385..9b6670cf5d5 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -465,6 +465,7 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f file = File::FileOpen(path, off, filesize); } + auto should_invalidate = false; if (file) { XrdCl::URL url(io->Path()); @@ -483,7 +484,28 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f TRACE(Debug, "GetFile: Setting `only if cached` mode"); file->SetOnlyIfCached(true); } + + int max_age; + if ((max_age = directive.MaxAge()) > 0) + { + time_t cur_age = time(NULL) - file->GetCreationTime(); + should_invalidate = cur_age > max_age; + } + } + } + if (should_invalidate) { + { + XrdSysCondVarHelper lock(&m_active_cond); + m_active.erase(it); + delete file; } + UnlinkFile(path, false); + { + XrdSysCondVarHelper lock(&m_active_cond); + it = m_active.insert(std::make_pair(path, (File*) 0)).first; + } + file = File::FileOpen(path, off, filesize); + io->ResetCachedStat(); } { diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index e62ff714e68..44560a58249 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -303,6 +303,8 @@ public: void SetStore(bool val) {m_store = val;} void SetOnlyIfCached(bool val) {m_only_if_cached = val;} + time_t GetCreationTime() const {return m_cfi.GetCreationTime();} + private: //! Constructor. diff --git a/src/XrdPfc/XrdPfcIO.hh b/src/XrdPfc/XrdPfcIO.hh index 1e5f5885b71..c26e567cfb9 100644 --- a/src/XrdPfc/XrdPfcIO.hh +++ b/src/XrdPfc/XrdPfcIO.hh @@ -48,6 +48,11 @@ public: XrdOucCacheIO* GetInput(); + //--------------------------------------------------------------------- + //! Reset the cached statistics + //--------------------------------------------------------------------- + virtual void ResetCachedStat() {} + protected: Cache &m_cache; //!< reference to Cache object const char *m_traceID; diff --git a/src/XrdPfc/XrdPfcIOFile.hh b/src/XrdPfc/XrdPfcIOFile.hh index 7e771dbbfb2..0d8c14bc0b8 100644 --- a/src/XrdPfc/XrdPfcIOFile.hh +++ b/src/XrdPfc/XrdPfcIOFile.hh @@ -36,7 +36,7 @@ namespace XrdPfc //! \brief Downloads original file into a single file on local disk. //! Handles read requests as they come along. //---------------------------------------------------------------------------- -class IOFile : public IO +class IOFile final : public IO { public: IOFile(XrdOucCacheIO *io, Cache &cache); @@ -77,6 +77,11 @@ public: long long FSize() override; + //--------------------------------------------------------------------- + //! Reset the cached statistics + //--------------------------------------------------------------------- + virtual void ResetCachedStat() override {if (m_localStat) {delete m_localStat; m_localStat = nullptr;}} + private: File *m_file; From f1c2a5eb5b4ab8a05a1a49da6db6e735cc7b176c Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 12 Mar 2023 21:57:49 -0500 Subject: [PATCH 6/8] Permit XrdPfc to reopen files internally after unlink Previously, an Unlink of a file that was opened would cause a failure on future file reads. --- src/XrdPfc/XrdPfc.cc | 14 +++----------- src/XrdPfc/XrdPfcFile.cc | 37 ++++++++++++++++++++++++++++++++++++- src/XrdPfc/XrdPfcFile.hh | 2 ++ 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/XrdPfc/XrdPfc.cc b/src/XrdPfc/XrdPfc.cc index 9b6670cf5d5..06cb65db7c4 100644 --- a/src/XrdPfc/XrdPfc.cc +++ b/src/XrdPfc/XrdPfc.cc @@ -1044,8 +1044,8 @@ int Cache::Stat(const char *curl, struct stat &sbuff) //______________________________________________________________________________ // virtual method of XrdOucCache. //! -//! @return <0 - Stat failed, value is -errno. -//! =0 - Stat succeeded, sbuff holds stat information. +//! @return <0 - Unlink failed, value is -errno. +//! =0 - Unlink succeeded. //------------------------------------------------------------------------------ int Cache::Unlink(const char *curl) @@ -1084,8 +1084,7 @@ int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open) } file = it->second; - file->initiate_emergency_shutdown(); - it->second = 0; + file->initiate_reopen(); } else { @@ -1105,12 +1104,5 @@ int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open) int i_ret = m_oss->Unlink(i_name.c_str()); TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret); - - { - XrdSysCondVarHelper lock(&m_active_cond); - - m_active.erase(it); - } - return std::min(f_ret, i_ret); } diff --git a/src/XrdPfc/XrdPfcFile.cc b/src/XrdPfc/XrdPfcFile.cc index f5f3eb5aa80..8fdd91cfecd 100644 --- a/src/XrdPfc/XrdPfcFile.cc +++ b/src/XrdPfc/XrdPfcFile.cc @@ -111,7 +111,6 @@ File* File::FileOpen(const std::string &path, long long offset, long long fileSi void File::initiate_emergency_shutdown() { - // Called from Cache::Unlink() when the file is currently open. // Cache::Unlink is also called on FSync error and when wrong number of bytes // is received from a remote read. // @@ -139,6 +138,22 @@ void File::initiate_emergency_shutdown() //------------------------------------------------------------------------------ +void File::initiate_reopen() +{ + // Called from Cache::Unlink() when the file is currently open. + XrdSysCondVarHelper _lck(m_state_cond); + + m_needs_reopen = true; + + if (m_prefetch_state != kStopped && m_prefetch_state != kComplete) + { + m_prefetch_state = kStopped; + cache()->DeRegisterPrefetchFile(this); + } +} + +//------------------------------------------------------------------------------ + Stats File::DeltaStatsFromLastCall() { // Not locked, only used from Cache / Purge thread. @@ -654,6 +669,16 @@ int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadR { m_state_cond.UnLock(); return m_in_shutdown ? -ENOENT : -EBADF; + } else if (m_needs_reopen) + { + m_state_cond.UnLock(); + if (!Open()) { + XrdSysCondVarHelper _lck(m_state_cond); + m_needs_reopen = false; + return -EIO; + } + m_state_cond.Lock(); + m_needs_reopen = false; } // Shortcut -- file is fully downloaded. @@ -683,6 +708,16 @@ int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh) { m_state_cond.UnLock(); return m_in_shutdown ? -ENOENT : -EBADF; + } else if (m_needs_reopen) + { + m_state_cond.UnLock(); + if (!Open()) { + XrdSysCondVarHelper _lck(m_state_cond); + m_needs_reopen = false; + return -EIO; + } + m_state_cond.Lock(); + m_needs_reopen = false; } // Shortcut -- file is fully downloaded. diff --git a/src/XrdPfc/XrdPfcFile.hh b/src/XrdPfc/XrdPfcFile.hh index 44560a58249..d980448da96 100644 --- a/src/XrdPfc/XrdPfcFile.hh +++ b/src/XrdPfc/XrdPfcFile.hh @@ -299,6 +299,7 @@ public: int dec_ref_cnt() { return --m_ref_cnt; } void initiate_emergency_shutdown(); + void initiate_reopen(); bool is_in_emergency_shutdown() { return m_in_shutdown; } void SetStore(bool val) {m_store = val;} @@ -344,6 +345,7 @@ private: bool m_in_sync; bool m_detach_time_logged; bool m_in_shutdown; //!< file is in emergency shutdown due to irrecoverable error or unlink request + bool m_needs_reopen{false}; //!< cached file was unlinked while open; needs to be reopened on next read. // Block state and management From d2257bf7347611031e3f4fa59e62f3ffc5934795 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 12 Mar 2023 21:59:04 -0500 Subject: [PATCH 7/8] Allow a XrdPosix operation to target only the cache Permits an invalidation of the cache without trying to delete the path from upstream as well. --- src/XrdPosix/XrdPosixXrootd.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/XrdPosix/XrdPosixXrootd.cc b/src/XrdPosix/XrdPosixXrootd.cc index 64494207bc6..083ab1e2540 100644 --- a/src/XrdPosix/XrdPosixXrootd.cc +++ b/src/XrdPosix/XrdPosixXrootd.cc @@ -1342,7 +1342,12 @@ int XrdPosixXrootd::Unlink(const char *path) if (XrdPosixGlobals::theCache) {LfnPath remf("unlink", path); if (!remf.path) return -1; - XrdPosixGlobals::theCache->Unlink(remf.path); + auto retval = XrdPosixGlobals::theCache->Unlink(remf.path); + // If the path specifies to only target the cache, stop here. + auto iter = admin.Url.GetParams().find("xrdposix.target"); + if (iter != admin.Url.GetParams().end() && iter->second == "cache") { + return retval; + } } // Issue the UnLink From 36df693600a5bb9d98a2421ad239115aefdf9dad Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 12 Mar 2023 22:00:11 -0500 Subject: [PATCH 8/8] Allow the HTTP server to invalidate the cache on max-age With this, the HTTP server can invalidate the cache after it has opened a file if the file proves to be older than the requested max-age. A side-effect is refactoring the state machine to be simpler -- for GET, each request state has a single operation that may occur. --- src/XrdHttp/XrdHttpReq.cc | 225 +++++++++++++++++++++----------------- src/XrdHttp/XrdHttpReq.hh | 14 +++ 2 files changed, 140 insertions(+), 99 deletions(-) diff --git a/src/XrdHttp/XrdHttpReq.cc b/src/XrdHttp/XrdHttpReq.cc index ff7300ccd5f..134f1b56cd2 100644 --- a/src/XrdHttp/XrdHttpReq.cc +++ b/src/XrdHttp/XrdHttpReq.cc @@ -1254,7 +1254,13 @@ int XrdHttpReq::ProcessHTTPReq() { } - + + // The reqstate parameter basically moves us through a simple state machine. + // - 0: Perform a stat on the resource + // - 1: Perform a checksum request on the resource (only if requested in header; otherwise skipped) + // - 2: Perform an open request (dirlist as appropriate). + // - 3: Unlink the underlying file (only for a too-old cache object; otherwise skipped) + // - 4+: Reads from file; if at end, perform a close. switch (reqstate) { case 0: // Stat() @@ -1267,9 +1273,32 @@ int XrdHttpReq::ProcessHTTPReq() { } return 0; - case 1: // Open() or dirlist + case 1: // Checksum request + if (!m_req_digest.empty()) { + // In this case, the Want-Digest header was set. + bool has_opaque = strchr(resourceplusopaque.c_str(), '?'); + // Note that doChksum requires that the memory stays alive until the callback is invoked. + m_resource_with_digest = resourceplusopaque; + if (has_opaque) { + m_resource_with_digest += "&cks.type="; + m_resource_with_digest += convert_digest_name(m_req_digest); + } else { + m_resource_with_digest += "?cks.type="; + m_resource_with_digest += convert_digest_name(m_req_digest); + } + if (prot->doChksum(m_resource_with_digest) < 0) { + prot->SendSimpleResp(500, NULL, NULL, (char *) "Failed to start internal checksum request to satisfy Want-Digest header.", 0, false); + return -1; + } + return 0; + } else { + // We are not + TRACEI(DEBUG, "No checksum requested; skipping to request state 2"); + reqstate += 1; + } + // fallthrough + case 2: // Open() or dirlist { - if (!prot->Bridge) { prot->SendSimpleResp(500, NULL, NULL, (char *) "prot->Bridge is NULL.", 0, false); return -1; @@ -1316,23 +1345,6 @@ int XrdHttpReq::ProcessHTTPReq() { // We don't want to be invoked again after this request is finished return 1; - } else if (!m_req_digest.empty()) { - // In this case, the Want-Digest header was set. - bool has_opaque = strchr(resourceplusopaque.c_str(), '?'); - // Note that doChksum requires that the memory stays alive until the callback is invoked. - m_resource_with_digest = resourceplusopaque; - if (has_opaque) { - m_resource_with_digest += "&cks.type="; - m_resource_with_digest += convert_digest_name(m_req_digest); - } else { - m_resource_with_digest += "?cks.type="; - m_resource_with_digest += convert_digest_name(m_req_digest); - } - if (prot->doChksum(m_resource_with_digest) < 0) { - prot->SendSimpleResp(500, NULL, NULL, (char *) "Failed to start internal checksum request to satisfy Want-Digest header.", 0, false); - return -1; - } - return 0; } else { @@ -1359,39 +1371,35 @@ int XrdHttpReq::ProcessHTTPReq() { } - case 2: // Open() in the case the user also requested a checksum. - { - if (!m_req_digest.empty()) { - // --------- OPEN - memset(&xrdreq, 0, sizeof (ClientRequest)); - xrdreq.open.requestid = htons(kXR_open); - l = resourceplusopaque.length() + 1; - xrdreq.open.dlen = htonl(l); - xrdreq.open.mode = 0; - xrdreq.open.options = htons(kXR_retstat | kXR_open_read); + case 3: // Unlink() in case of an expired cache entry. + if (m_unlink_entry) { + TRACEI(DEBUG, "Unlinking expired cache entry") + memset(&xrdreq, 0, sizeof (ClientRequest)); + xrdreq.rm.requestid = htons(kXR_rm); - if (!prot->Bridge->Run((char *) &xrdreq, (char *) resourceplusopaque.c_str(), l)) { - prot->SendSimpleResp(404, NULL, NULL, (char *) "Could not run request.", 0, false); - return -1; - } + std::string unlink_path = resourceplusopaque.c_str(); + unlink_path += std::string((unlink_path.find('?') == std::string::npos) ? "?" : "&") + "xrdposix.target=cache"; - // Prepare to chunk up the request - writtenbytes = 0; + auto data_len = unlink_path.size() + 1; + xrdreq.rm.dlen = htonl(data_len); - // We want to be invoked again after this request is finished - return 0; + if (!prot->Bridge->Run((char *) &xrdreq, (char *) unlink_path.c_str(), data_len)) { + prot->SendSimpleResp(501, NULL, NULL, (char *) "Could not run cache unlink request.", 0, false); + return -1; } + return 0; + } else { + TRACEI(DEBUG, "No unlink requested; skipping to request state 4"); + reqstate += 1; } // fallthrough - default: // Read() or Close() + default: // Read() or Close(); reqstate is 4+ { - if ( ((reqstate == 3 || (!m_req_digest.empty() && (reqstate == 4))) && (rwOps.size() > 1)) || - (writtenbytes >= length) ) { - - // Close() if this was a readv or we have finished, otherwise read the next chunk - - // --------- CLOSE + // --------- CLOSE + if ( ((reqstate == 4) && (rwOps.size() > 1)) || // In this case, we performed a ReadV and it's done. + (writtenbytes >= length) ) // No ReadV but we have completed the request. + { memset(&xrdreq, 0, sizeof (ClientRequest)); xrdreq.close.requestid = htons(kXR_close); @@ -1406,6 +1414,7 @@ int XrdHttpReq::ProcessHTTPReq() { return 1; } + // --------- READ or READV if (rwOps.size() <= 1) { // No chunks or one chunk... Request the whole file or single read @@ -1466,7 +1475,7 @@ int XrdHttpReq::ProcessHTTPReq() { return -1; } } else { - // More than one chunk to read... use readv + // --------- READV length = ReqReadV(); @@ -1479,12 +1488,12 @@ int XrdHttpReq::ProcessHTTPReq() { // We want to be invoked again after this request is finished return 0; - } + } // case 4+ - } + } // switch (reqstate) - } + } // case XrdHttpReq::rtGET case XrdHttpReq::rtPUT: { @@ -1948,6 +1957,22 @@ XrdHttpReq::PostProcessChecksum(std::string &digest_header) { } +int +XrdHttpReq::SendGetResponse() +{ + if (rwOps.size() == 0) { + if (m_transfer_encoding_chunked && m_trailer_headers) { + prot->StartChunkedResp(200, NULL, m_get_response.empty() ? NULL : m_get_response.c_str(), filesize, keepalive); + } else { + prot->SendSimpleResp(200, NULL, m_get_response.empty() ? NULL : m_get_response.c_str(), NULL, filesize, keepalive); + } + } else { + prot->SendSimpleResp(206, NULL, m_get_response.c_str(), NULL, m_get_response_length, keepalive); + } + return 0; +} + + // This is invoked by the callbacks, after something has happened in the bridge int XrdHttpReq::PostProcessHTTPReq(bool final_) { @@ -2223,10 +2248,16 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { } - } else { - - + } // end handling of dirlist + else + { // begin handling of open-read-close + // To duplicate the state diagram from the rtGET request state + // - 0: Perform a stat on the resource + // - 1: Perform a checksum request on the resource (only if requested in header; otherwise skipped) + // - 2: Perform an open request (dirlist as appropriate). + // - 3: Unlink the underlying file (only for a too-old cache object; otherwise skipped) + // - 4+: Reads from file; if at end, perform a close. switch (reqstate) { case 0: //stat { @@ -2272,21 +2303,14 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { // We are here in the case of a negative response in a non-manager return 0; + } // end stat + case 1: // checksum was requested and now we have its response. + { + return PostProcessChecksum(m_digest_header); } - case 1: // open - case 2: // open when digest was requested + case 2: // open { - - if (reqstate == 1 && !m_req_digest.empty()) { // We requested a checksum and now have its response. - int response = PostProcessChecksum(m_digest_header); - if (-1 == response) { - return -1; - } - return 0; - } else if (((reqstate == 2 && !m_req_digest.empty()) || - (reqstate == 1 && m_req_digest.empty())) - && (xrdresp == kXR_ok)) { - + if (xrdresp == kXR_ok) { getfhandle(); @@ -2323,8 +2347,6 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { responseHeader += "\r\n"; } long object_age = time(NULL) - filemodtime; - responseHeader += std::string("Age: ") + std::to_string(object_age < 0 ? 0 : object_age); - if (!m_cache_control.empty()) { XrdOucCacheDirective directive(m_cache_control); @@ -2333,23 +2355,21 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { prot->SendSimpleResp(504, "Gateway Timeout", responseHeader.empty() ? nullptr : responseHeader.c_str(), nullptr, 0, false); return -1; + } else if (object_age > directive.MaxAge()) { + m_unlink_entry = true; + object_age = 0; } } + responseHeader += std::string("Age: ") + std::to_string(object_age < 0 ? 0 : object_age); } if (rwOps.size() == 0) { - // Full file. - - if (m_transfer_encoding_chunked && m_trailer_headers) { - prot->StartChunkedResp(200, NULL, responseHeader.empty() ? NULL : responseHeader.c_str(), filesize, keepalive); - } else { - prot->SendSimpleResp(200, NULL, responseHeader.empty() ? NULL : responseHeader.c_str(), NULL, filesize, keepalive); - } - return 0; - } else - if (rwOps.size() == 1) { + m_get_response_length = filesize; + m_get_response = responseHeader; + } else if (rwOps.size() == 1) { // Only one read to perform - if (rwOps[0].byteend < 0) // The requested range was along the lines of "Range: 1234-", meaning we need to fill in the end + if (rwOps[0].byteend < 0) // The requested range was along the lines of "Range: 1234-", + // meaning we need to fill in the end rwOps[0].byteend = filesize - 1; int cnt = (rwOps[0].byteend - rwOps[0].bytestart + 1); char buf[64]; @@ -2362,8 +2382,8 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { s += responseHeader.c_str(); } - prot->SendSimpleResp(206, NULL, (char *)s.c_str(), NULL, cnt, keepalive); - return 0; + m_get_response_length = cnt; + m_get_response = s.c_str(); } else if (rwOps.size() > 1) { // Multiple reads to perform, compose and send the header @@ -2388,34 +2408,38 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { header += m_digest_header; } - prot->SendSimpleResp(206, NULL, header.c_str(), NULL, cnt, keepalive); - return 0; + m_get_response_length = cnt; + m_get_response = header; } - - - } else if (xrdresp != kXR_ok) { + if (m_unlink_entry) + return 0; + else + return SendGetResponse(); + } else { // xrdresp indicates an error occurred - // If it's a dir then we are in the wrong place and we did the wrong thing. - //if (xrderrcode == 3016) { - // fileflags &= kXR_isDir; - // reqstate--; - // return 0; - //} prot->SendSimpleResp(httpStatusCode, NULL, NULL, httpStatusText.c_str(), httpStatusText.length(), false); return -1; } - - // Remaining case: reqstate == 2 and we didn't ask for a digest (should be a read). + // Case should not be reachable + return -1; } - // fallthrough - default: //read or readv + case 3: // Unlink an entry from the cache + { + if (xrdresp != kXR_ok) { + prot->SendSimpleResp(httpStatusCode, NULL, NULL, + httpStatusText.c_str(), httpStatusText.length(), keepalive); + return -1; + } + return SendGetResponse(); + } + default: // read or readv { // If we are postprocessing a close, potentially send out informational trailers if ((ntohs(xrdreq.header.requestid) == kXR_close) || - ((reqstate == 3) && (ntohs(xrdreq.header.requestid) == kXR_readv))) + ((reqstate == 4) && (ntohs(xrdreq.header.requestid) == kXR_readv))) { if (m_transfer_encoding_chunked && m_trailer_headers) { @@ -2546,12 +2570,11 @@ int XrdHttpReq::PostProcessHTTPReq(bool final_) { this->iovN = 0; return 0; - } + } // end read or readv } // switch reqstate - - } + } // End handling of the open-read+-close case break; @@ -2994,8 +3017,12 @@ void XrdHttpReq::reset() { m_req_digest.clear(); m_resource_with_digest = ""; + m_digest_header.clear(); m_cache_control.clear(); + m_get_response.clear(); + m_get_response_length = 0; headerok = false; + m_unlink_entry = false; keepalive = true; length = 0; filesize = 0; diff --git a/src/XrdHttp/XrdHttpReq.hh b/src/XrdHttp/XrdHttpReq.hh index e12f30186f9..6e80b417d1f 100644 --- a/src/XrdHttp/XrdHttpReq.hh +++ b/src/XrdHttp/XrdHttpReq.hh @@ -132,6 +132,11 @@ private: */ void selectChecksum(const std::string & userDigest, std::string & selectedChecksum); + /** + * A helper function for sending the response headers for the GET. + */ + int SendGetResponse(); + public: XrdHttpReq(XrdHttpProtocol *protinstance) : keepalive(true) { @@ -216,6 +221,15 @@ public: /// Tells if we have finished reading the header bool headerok; + /// Set to true if we should flush the cache contents prior to + // proceeding + bool m_unlink_entry{false}; + + /// We have a helper function for sending the GET response (which may + // be deferred across an asynchronous callback). + // This is all the state needed to send it + std::string m_get_response; + long long m_get_response_length{0}; // This can be largely optimized... /// The original list of multiple reads to perform