From 7da01d330f53ca02721271adcb7650c9d89c0f1e Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Wed, 20 Dec 2017 20:04:54 -0600 Subject: [PATCH] Add initial performance marker support. If Xrootd supports chunked transfer encoding, then send performance markers along with the COPY request response. --- src/tpc.cpp | 226 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 184 insertions(+), 42 deletions(-) diff --git a/src/tpc.cpp b/src/tpc.cpp index 85d6af43674..e0e66e0875e 100644 --- a/src/tpc.cpp +++ b/src/tpc.cpp @@ -111,6 +111,8 @@ class XrdHttpTPCState { m_fh->close(); } + off_t BytesTransferred() const {return m_offset;} + bool InstallHandlers(CURL *curl) { curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XRDTPC_VERSION); curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &XrdHttpTPCState::HeaderCB); @@ -127,6 +129,7 @@ class XrdHttpTPCState { curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &XrdHttpTPCState::WriteCB); curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); } + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); //curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1L); return true; } @@ -147,6 +150,11 @@ class XrdHttpTPCState { } } + /** + * Perform the curl-based transfer, responding periodically with transfer + * markers. + */ + int GetStatusCode() const {return m_status_code;} private: @@ -157,8 +165,7 @@ class XrdHttpTPCState { } int Header(const std::string &header) { - // TODO: Handle status codes appropriately. - printf("Recieved remote header: %s\n", header.c_str()); + printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str()); if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect? m_recv_all_headers = false; m_recv_status_line = false; @@ -168,7 +175,7 @@ class XrdHttpTPCState { std::string item; if (!std::getline(ss, item, ' ')) return 0; m_resp_protocol = item; - printf("Response protocol: %s\n", m_resp_protocol.c_str()); + printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str()); if (!std::getline(ss, item, ' ')) return 0; try { m_status_code = std::stol(item); @@ -177,7 +184,7 @@ class XrdHttpTPCState { } m_recv_status_line = true; } - if (header.size() == 0) {m_recv_all_headers = true;} + if (header.size() == 0 || header == "\n") {m_recv_all_headers = true;} return header.size(); } @@ -322,6 +329,166 @@ class XrdHttpTPC : public XrdHttpExtHandler { return open_result; } +#ifdef XRD_CHUNK_RESP + int SendPerfMarker(XrdHttpExtReq &req, XrdHttpTPCState &state) { + std::stringstream ss; + const std::string crlf = "\r\n"; + ss << "Perf Marker" << crlf; + ss << " Timestamp: " << time(NULL) << crlf; + ss << " Stripe Index: 0" << crlf; + ss << " Stripe Bytes Transferred: " << state.BytesTransferred() << crlf; + ss << " Total Stripe Count: 1" << crlf; + ss << "End" << crlf; + + return req.ChunkResp(ss.str().c_str(), 0); + } + + int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, XrdHttpTPCState &state, + const char *log_prefix) { + + // Create the multi-handle and add in the current transfer to it. + CURLM *multi_handle = curl_multi_init(); + if (!multi_handle) { + m_log.Emsg(log_prefix, "Failed to initialize a libcurl multi-handle"); + char msg[] = "Failed to initialize internal server memory"; + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + + CURLMcode mres; + mres = curl_multi_add_handle(multi_handle, curl); + if (mres) { + m_log.Emsg(log_prefix, "Failed to add transfer to libcurl multi-handle", + curl_multi_strerror(mres)); + char msg[] = "Failed to initialize internal server handle"; + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + + // Start response to client prior to the first call to curl_multi_perform + int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); + if (retval) { + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + return retval; + } + + // Transfer loop: use curl to actually run the transfer, but periodically + // interrupt things to send back performance updates to the client. + int running_handles = 1; + time_t last_marker = 0; + do { + time_t now = time(NULL); + time_t next_marker = last_marker + m_marker_period; + if (now > next_marker) { + if (SendPerfMarker(req, state)) { + curl_multi_remove_handle(multi_handle, curl); + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + return -1; + } + last_marker = now; + } + mres = curl_multi_perform(multi_handle, &running_handles); + if (mres == CURLM_CALL_MULTI_PERFORM) { + // curl_multi_perform should be called again immediately. On newer + // versions of curl, this is no longer used. + continue; + } else if (mres != CURLM_OK) { + break; + } else if (running_handles == 0) { + break; + } + + int64_t max_sleep_time = next_marker - time(NULL); + if (max_sleep_time < 0) { + continue; + } + int fd_count; + mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); + if (mres != CURLM_OK) { + break; + } + } while (running_handles); + + if (mres != CURLM_OK) { + m_log.Emsg(log_prefix, "Internal libcurl multi-handle error", + curl_multi_strerror(mres)); + char msg[] = "Internal server error due to libcurl"; + curl_multi_remove_handle(multi_handle, curl); + curl_easy_cleanup(curl); + + curl_multi_cleanup(multi_handle); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + + // Harvest any messages, looking for CURLMSG_DONE. + CURLcode res = static_cast(-1); + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + res = msg->data.result; + curl_multi_remove_handle(multi_handle, easy_handle); + curl_easy_cleanup(easy_handle); + } + } while (msg); + + if (res == -1) { // No transfers returned?!? + curl_multi_remove_handle(multi_handle, curl); + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + char msg[] = "Internal state error in libcurl"; + m_log.Emsg(log_prefix, msg); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + curl_multi_cleanup(multi_handle); + + // Generate the final response back to the client. + std::stringstream ss; + if (res != CURLE_OK) { + m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); + ss << "failure: " << curl_easy_strerror(res); + } else if (state.GetStatusCode() >= 400) { + ss << "failure: Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + } else { + ss << "success: Created"; + } + + if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + return retval; + } + return req.ChunkResp(nullptr, 0); + } +#else + int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, XrdHttpTPCState &state, + const char *log_prefix) { + CURLcode res; + res = curl_easy_perform(curl); + curl_easy_cleanup(curl); + if (res == CURLE_HTTP_RETURNED_ERROR) { + m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); + return req.SendSimpleResp(500, nullptr, nullptr, const_cast(curl_easy_strerror(res)), 0); + } else if (state.GetStatusCode() >= 400) { + std::stringstream ss; + ss << "Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + return req.SendSimpleResp(500, nullptr, nullptr, const_cast(ss.str().c_str()), 0); + } else if (res) { + m_log.Emsg(log_prefix, "Curl failed", curl_easy_strerror(res)); + char msg[] = "Unknown internal transfer failure"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } else { + char msg[] = "Created"; + return req.SendSimpleResp(201, nullptr, nullptr, msg, 0); + } + } +#endif + int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) { CURL *curl = curl_easy_init(); if (!curl) { @@ -351,28 +518,16 @@ class XrdHttpTPC : public XrdHttpExtHandler { return resp_result; } - CURLcode res; curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); XrdHttpTPCState state(std::move(fh), curl, true); state.CopyHeaders(req); - res = curl_easy_perform(curl); - if (res == CURLE_HTTP_RETURNED_ERROR) { - m_log.Emsg("ProcessPushReq", "Remote server failed request", curl_easy_strerror(res)); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(curl_easy_strerror(res)), 0); - } else if (state.GetStatusCode() >= 400) { - std::stringstream ss; - ss << "Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg("ProcessPushReq", "Remote server failed request", ss.str().c_str()); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(ss.str().c_str()), 0); - } else if (res) { - m_log.Emsg("ProcessPushReq", "Curl failed", curl_easy_strerror(res)); - char msg[] = "Unknown internal transfer failure"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } else { - char msg[] = "Created"; - return req.SendSimpleResp(201, nullptr, nullptr, msg, 0); - } + +#ifdef XRD_CHUNK_RESP + return RunCurlWithUpdates(curl, req, state, "ProcessPushReq"); +#else + return RunCurlBasic(curl, req, state, "ProcessPushReq"); +#endif } int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) { @@ -409,29 +564,15 @@ class XrdHttpTPC : public XrdHttpExtHandler { fh->close(); return resp_result; } - - CURLcode res; curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); - XrdHttpTPCState state(std::move(fh), curl, false); state.CopyHeaders(req); - res = curl_easy_perform(curl); - if (res == CURLE_HTTP_RETURNED_ERROR) { - m_log.Emsg("ProcessPullReq", "Remote server failed request", curl_easy_strerror(res)); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(curl_easy_strerror(res)), 0); - } else if (state.GetStatusCode() >= 400) { - std::stringstream ss; - ss << "Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg("ProcessPushReq", "Remote server failed request", ss.str().c_str()); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(ss.str().c_str()), 0); - } else if (res) { - m_log.Emsg("ProcessPullReq", "Curl failed", curl_easy_strerror(res)); - char msg[] = "Unknown internal transfer failure"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } else { - char msg[] = "Created"; - return req.SendSimpleResp(201, nullptr, nullptr, msg, 0); - } + +#ifdef XRD_CHUNK_RESP + return RunCurlWithUpdates(curl, req, state, "ProcessPushReq"); +#else + return RunCurlBasic(curl, req, state, "ProcessPushReq"); +#endif } bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, std::string &path2, bool &path2_alt) { @@ -564,6 +705,7 @@ class XrdHttpTPC : public XrdHttpExtHandler { return true; } + static constexpr int m_marker_period = 20; bool m_desthttps{false}; static std::atomic m_monid; XrdSysError &m_log;