From 200170c75f6da65c9a76750a0f62eccbd38d7110 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Wed, 3 Jan 2018 22:59:22 -0600 Subject: [PATCH] Allow TPC code to determine remote size when pulling. Knowing the size of the remote resource is the first step in doing a multi-stream HTTP GET transfer. --- src/tpc.cpp | 85 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 78 insertions(+), 7 deletions(-) diff --git a/src/tpc.cpp b/src/tpc.cpp index f75310a73e1..305032dd5a4 100644 --- a/src/tpc.cpp +++ b/src/tpc.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -156,13 +157,17 @@ class XrdHttpTPCState { } } - /** - * Perform the curl-based transfer, responding periodically with transfer - * markers. - */ + off_t GetContentLength() const {return m_content_length;} int GetStatusCode() const {return m_status_code;} + void ResetAfterSize() { + m_status_code = -1; + m_content_length = -1; + m_recv_all_headers = false; + m_recv_status_line = false; + } + private: static size_t HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata) { XrdHttpTPCState *obj = static_cast(userdata); @@ -189,8 +194,33 @@ class XrdHttpTPCState { return 0; } m_recv_status_line = true; + } else if (header.size() == 0 || header == "\n") { + m_recv_all_headers = true; + } + else if (header != "\r\n") { + // Parse the header + std::size_t found = header.find(":"); + if (found != std::string::npos) { + std::string header_name = header.substr(0, found); + std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower); + std::string header_value = header.substr(found+1); + if (header_name == "content-length") + { + try { + m_content_length = std::stoll(header_value); + } catch (...) { + // Header unparseable -- not a great sign, fail request. + //printf("Content-length header unparseable\n"); + return 0; + } + } + } else { + // Non-empty header that isn't the status line, but no ':' present -- + // malformed request? + //printf("Malformed header: %s\n", header.c_str()); + return 0; + } } - if (header.size() == 0 || header == "\n") {m_recv_all_headers = true;} return header.size(); } @@ -233,6 +263,7 @@ class XrdHttpTPCState { bool m_recv_all_headers{false}; off_t m_offset{0}; int m_status_code{-1}; + off_t m_content_length{-1}; std::unique_ptr m_fh; CURL *m_curl{nullptr}; struct curl_slist *m_headers{nullptr}; @@ -339,6 +370,36 @@ class XrdHttpTPC : public XrdHttpExtHandler { } #ifdef XRD_CHUNK_RESP + /** + * Determine size at remote end. + */ + int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, XrdHttpTPCState &state, + bool &success) { + success = false; + curl_easy_setopt(curl, CURLOPT_NOBODY, 1); + CURLcode res; + res = curl_easy_perform(curl); + if (res == CURLE_HTTP_RETURNED_ERROR) { + m_log.Emsg("DetermineXferSize", "Remote server failed request", curl_easy_strerror(res)); + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, const_cast(curl_easy_strerror(res)), 0); + } else if (state.GetStatusCode() >= 400) { + std::stringstream ss; + ss << "Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg("DetermineXferSize", "Remote server failed request", ss.str().c_str()); + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, const_cast(ss.str().c_str()), 0); + } else if (res) { + m_log.Emsg("DetermineXferSize", "Curl failed", curl_easy_strerror(res)); + char msg[] = "Unknown internal transfer failure"; + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + curl_easy_setopt(curl, CURLOPT_NOBODY, 0); + success = true; + return 0; + } + int SendPerfMarker(XrdHttpExtReq &req, XrdHttpTPCState &state) { std::stringstream ss; const std::string crlf = "\n"; @@ -599,9 +660,19 @@ class XrdHttpTPC : public XrdHttpExtHandler { state.CopyHeaders(req); #ifdef XRD_CHUNK_RESP - return RunCurlWithUpdates(curl, req, state, "ProcessPushReq"); + int result; + bool success; + if ((result = DetermineXferSize(curl, req, state, success)) || !success) { + return result; + } + std::stringstream ss; + ss << "Successfully determined remote size for pull request: " << state.GetContentLength(); + m_log.Emsg("ProcessPullReq", ss.str().c_str()); + state.ResetAfterSize(); + + return RunCurlWithUpdates(curl, req, state, "ProcessPullReq"); #else - return RunCurlBasic(curl, req, state, "ProcessPushReq"); + return RunCurlBasic(curl, req, state, "ProcessPullReq"); #endif }