From 725e070f60bdc97bb1d7b5dc8e2ec3be4ade31dd Mon Sep 17 00:00:00 2001 From: Cedric Caffy Date: Thu, 13 Oct 2022 11:52:52 +0200 Subject: [PATCH] [XrdHttpTPC] TPC-Pull: passing the source file size information to the OFS layer via opaque parameter "oss.asize" Implemented by doing a HEAD request on the Source server's file path and getting the content-length out of it. This solves the issue #1754 --- src/XrdTpc/XrdTpcState.cc | 34 ++++++------ src/XrdTpc/XrdTpcState.hh | 28 +++++++++- src/XrdTpc/XrdTpcTPC.cc | 110 ++++++++++++++++++++++++++------------ src/XrdTpc/XrdTpcTPC.hh | 4 +- 4 files changed, 123 insertions(+), 53 deletions(-) diff --git a/src/XrdTpc/XrdTpcState.cc b/src/XrdTpc/XrdTpcState.cc index 9dd2ae70e84..f5412752401 100644 --- a/src/XrdTpc/XrdTpcState.cc +++ b/src/XrdTpc/XrdTpcState.cc @@ -38,12 +38,14 @@ void State::Move(State &other) m_headers = other.m_headers; m_headers_copy = other.m_headers_copy; m_resp_protocol = other.m_resp_protocol; - + m_is_transfer_state = other.m_is_transfer_state; curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this); - if (m_push) { - curl_easy_setopt(m_curl, CURLOPT_READDATA, this); - } else { - curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this); + if (m_is_transfer_state) { + if (m_push) { + curl_easy_setopt(m_curl, CURLOPT_READDATA, this); + } else { + curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this); + } } other.m_headers_copy.clear(); other.m_curl = NULL; @@ -56,17 +58,19 @@ bool State::InstallHandlers(CURL *curl) { curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION); curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB); curl_easy_setopt(curl, CURLOPT_HEADERDATA, this); - if (m_push) { - curl_easy_setopt(curl, CURLOPT_UPLOAD, 1); - curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB); - curl_easy_setopt(curl, CURLOPT_READDATA, this); - struct stat buf; - if (SFS_OK == m_stream->Stat(&buf)) { - curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size); + if(m_is_transfer_state) { + if (m_push) { + curl_easy_setopt(curl, CURLOPT_UPLOAD, 1); + curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB); + curl_easy_setopt(curl, CURLOPT_READDATA, this); + struct stat buf; + if (SFS_OK == m_stream->Stat(&buf)) { + curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size); + } + } else { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); } - } else { - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); } curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); diff --git a/src/XrdTpc/XrdTpcState.hh b/src/XrdTpc/XrdTpcState.hh index 394279b69fd..664ef46a3f6 100644 --- a/src/XrdTpc/XrdTpcState.hh +++ b/src/XrdTpc/XrdTpcState.hh @@ -31,9 +31,31 @@ public: m_content_length(-1), m_stream(NULL), m_curl(NULL), - m_headers(NULL) + m_headers(NULL), + m_is_transfer_state(true) {} + /** + * Don't use that constructor if you want to do some transfers. + * @param curl the curl handle + */ + State(CURL * curl): + m_push(true), + m_recv_status_line(false), + m_recv_all_headers(false), + m_offset(0), + m_start_offset(0), + m_status_code(-1), + m_error_code(0), + m_content_length(-1), + m_stream(NULL), + m_curl(curl), + m_headers(NULL), + m_is_transfer_state(false) + { + InstallHandlers(curl); + } + // Note that we are "borrowing" a reference to the curl handle; // it is not owned / freed by the State object. However, we use it // as if there's only one handle per State. @@ -48,7 +70,8 @@ public: m_content_length(-1), m_stream(&stream), m_curl(curl), - m_headers(NULL) + m_headers(NULL), + m_is_transfer_state(true) { InstallHandlers(curl); } @@ -143,6 +166,7 @@ private: std::vector m_headers_copy; // Copies of custom headers. std::string m_resp_protocol; // Response protocol in the HTTP status line. std::string m_error_buf; // Any error associated with a response. + bool m_is_transfer_state; // If set to true, this state will be used to perform some transfers }; }; diff --git a/src/XrdTpc/XrdTpcTPC.cc b/src/XrdTpc/XrdTpcTPC.cc index f35fda1f083..5befc42222b 100644 --- a/src/XrdTpc/XrdTpcTPC.cc +++ b/src/XrdTpc/XrdTpcTPC.cc @@ -144,33 +144,41 @@ if (purpose == CURLSOCKTYPE_IPCXN && clientp) // One special key is `authz`; this is always stripped out and copied to the Authorization // header (which will later be used for XrdSecEntity). The latter copy is only done if // the Authorization header is not already present. -static std::string prepareURL(XrdHttpExtReq &req) { - std::map::const_iterator iter = req.headers.find("xrd-http-query"); - if (iter == req.headers.end() || iter->second.empty()) {return req.resource;} - - auto has_authz_header = req.headers.find("Authorization") != req.headers.end(); - - std::istringstream requestStream(iter->second); - std::string token; - std::stringstream result; - bool found_first_header = false; - while (std::getline(requestStream, token, '&')) { - if (token.empty()) { - continue; - } else if (!strncmp(token.c_str(), "authz=", 6)) { - if (!has_authz_header) { - req.headers["Authorization"] = token.substr(6); - has_authz_header = true; - } - } else if (!found_first_header) { - result << "?" << token; - found_first_header = true; - } else { - result << "&" << token; +// +// hasSetOpaque will be set to true if at least one opaque data has been set in the URL that is returned, +// false otherwise +static std::string prepareURL(XrdHttpExtReq &req, bool & hasSetOpaque) { + std::map::const_iterator iter = req.headers.find("xrd-http-query"); + if (iter == req.headers.end() || iter->second.empty()) {return req.resource;} + + auto has_authz_header = req.headers.find("Authorization") != req.headers.end(); + + std::istringstream requestStream(iter->second); + std::string token; + std::stringstream result; + bool found_first_header = false; + while (std::getline(requestStream, token, '&')) { + if (token.empty()) { + continue; + } else if (!strncmp(token.c_str(), "authz=", 6)) { + if (!has_authz_header) { + req.headers["Authorization"] = token.substr(6); + has_authz_header = true; + } + } else if (!found_first_header) { + result << "?" << token; + found_first_header = true; + } else { + result << "&" << token; + } } - } + hasSetOpaque = found_first_header; + return req.resource + result.str().c_str(); +} - return req.resource + result.str().c_str(); +static std::string prepareURL(XrdHttpExtReq &req) { + bool foundHeader; + return prepareURL(req,foundHeader); } /******************************************************************************/ @@ -401,9 +409,11 @@ int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, opaque = resource.substr(pos + 1); } - // Append the authz information - opaque += (opaque.empty() ? "" : "&"); - opaque += authz; + // Append the authz information if there are some + if(!authz.empty()) { + opaque += (opaque.empty() ? "" : "&"); + opaque += authz; + } open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str()); if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) { @@ -422,11 +432,14 @@ int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, /******************************************************************************/ #ifdef XRD_CHUNK_RESP + + + /** * Determine size at remote end. */ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, - bool &success, TPCLogRecord &rec) { + bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) { success = false; curl_easy_setopt(curl, CURLOPT_NOBODY, 1); CURLcode res; @@ -436,20 +449,20 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, ss << "Remote server failed request: " << curl_easy_strerror(res); rec.status = 500; logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); - return req.SendSimpleResp(rec.status, NULL, NULL, const_cast(curl_easy_strerror(res)), 0); + return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, const_cast(curl_easy_strerror(res)), 0) : -1; } else if (state.GetStatusCode() >= 400) { std::stringstream ss; ss << "Remote side failed with status code " << state.GetStatusCode(); rec.status = 500; logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); - return req.SendSimpleResp(rec.status, NULL, NULL, const_cast(ss.str().c_str()), 0); + return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, const_cast(ss.str().c_str()), 0): -1; } else if (res) { std::stringstream ss; ss << "HTTP library failed: " << curl_easy_strerror(res); rec.status = 500; logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str()); char msg[] = "Unknown internal transfer failure"; - return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0); + return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, msg, 0) : -1; } std::stringstream ss; ss << "Successfully determined remote size for pull request: " @@ -459,6 +472,17 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, success = true; return 0; } + +int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) { + State state(curl); + int result; + //In case we cannot get the content length, we don't return anything to the client + if ((result = DetermineXferSize(curl, req, state, success, rec, false)) || !success) { + return result; + } + contentLength = state.GetContentLength(); + return result; +} /******************************************************************************/ /* XRD_CHUNK_RESP: */ @@ -936,9 +960,26 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) } } rec.streams = streams; - std::string full_url = prepareURL(req); + bool hasSetOpaque; + std::string full_url = prepareURL(req, hasSetOpaque); std::string authz = GetAuthz(req); - + curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); +#ifdef XRD_CHUNK_RESP + { + //Get the content-length of the source file and pass it to the OSS layer + //during the open + uint64_t sourceFileContentLength = 0; + bool success; + TPCLogRecord getContentLengthRec; + GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, getContentLengthRec); + if(success) { + //In the case we cannot get the information from the source server (offline or other error) + //we just don't add the size information to the opaque of the local file to open + full_url += hasSetOpaque ? "&" : "?"; + full_url += "oss.asize=" + std::to_string(sourceFileContentLength); + } + } +#endif int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY, 0644, req.GetSecEntity(), authz); if (SFS_REDIRECT == open_result) { @@ -959,7 +1000,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) return resp_result; } ConfigureCurlCA(curl); - curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log); State state(0, stream, curl, false); state.CopyHeaders(req); diff --git a/src/XrdTpc/XrdTpcTPC.hh b/src/XrdTpc/XrdTpcTPC.hh index f8a63fddc9b..5b8090d5bf9 100644 --- a/src/XrdTpc/XrdTpcTPC.hh +++ b/src/XrdTpc/XrdTpcTPC.hh @@ -96,7 +96,9 @@ private: #ifdef XRD_CHUNK_RESP int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state, - bool &success, TPCLogRecord &); + bool &success, TPCLogRecord &, bool shouldReturnErrorToClient = true); + + int GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t & contentLength, bool & success, TPCLogRecord &rec); // Send a 'performance marker' back to the TPC client, informing it of our // progress. The TPC client will use this information to determine whether