diff --git a/src/tpc.cpp b/src/tpc.cpp index e0e66e0875e..033b349c6e5 100644 --- a/src/tpc.cpp +++ b/src/tpc.cpp @@ -165,7 +165,7 @@ class XrdHttpTPCState { } int Header(const std::string &header) { - printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str()); + //printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str()); if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect? m_recv_all_headers = false; m_recv_status_line = false; @@ -175,7 +175,7 @@ class XrdHttpTPCState { std::string item; if (!std::getline(ss, item, ' ')) return 0; m_resp_protocol = item; - printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str()); + //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str()); if (!std::getline(ss, item, ' ')) return 0; try { m_status_code = std::stol(item); @@ -201,6 +201,7 @@ class XrdHttpTPCState { return -1; } m_offset += retval; + //printf("Wrote a total of %ld bytes.\n", m_offset); return retval; } @@ -217,13 +218,14 @@ class XrdHttpTPCState { return -1; } m_offset += retval; + //printf("Read a total of %ld bytes.\n", m_offset); return retval; } bool m_push{true}; bool m_recv_status_line{false}; bool m_recv_all_headers{false}; - XrdSfsXferSize m_offset{0}; + off_t m_offset{0}; int m_status_code{-1}; std::unique_ptr m_fh; CURL *m_curl{nullptr}; @@ -250,7 +252,8 @@ class XrdHttpTPC : public XrdHttpExtHandler { if (header != req.headers.end()) { return ProcessPushReq(header->second, req); } - return req.SendSimpleResp(400, NULL, NULL, (char *)"No Source or Destination specified", 0); + m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified."); + return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0); } /** @@ -332,12 +335,14 @@ class XrdHttpTPC : public XrdHttpExtHandler { #ifdef XRD_CHUNK_RESP int SendPerfMarker(XrdHttpExtReq &req, XrdHttpTPCState &state) { std::stringstream ss; - const std::string crlf = "\r\n"; + const std::string crlf = "\n"; ss << "Perf Marker" << crlf; - ss << " Timestamp: " << time(NULL) << crlf; - ss << " Stripe Index: 0" << crlf; - ss << " Stripe Bytes Transferred: " << state.BytesTransferred() << crlf; - ss << " Total Stripe Count: 1" << crlf; + ss << "Timestamp: " << time(NULL) << crlf; + ss << "Stripe Index: 0" << crlf; + for (int i=0; i<30; i++) + ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf; + ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf; + ss << "Total Stripe Count: 1" << crlf; ss << "End" << crlf; return req.ChunkResp(ss.str().c_str(), 0); @@ -378,10 +383,11 @@ class XrdHttpTPC : public XrdHttpExtHandler { // interrupt things to send back performance updates to the client. int running_handles = 1; time_t last_marker = 0; + CURLcode res = static_cast(-1); do { time_t now = time(NULL); time_t next_marker = last_marker + m_marker_period; - if (now > next_marker) { + if (now >= next_marker) { if (SendPerfMarker(req, state)) { curl_multi_remove_handle(multi_handle, curl); curl_easy_cleanup(curl); @@ -400,9 +406,23 @@ class XrdHttpTPC : public XrdHttpExtHandler { } else if (running_handles == 0) { break; } + //printf("There are %d running handles\n", running_handles); + + // Harvest any messages, looking for CURLMSG_DONE. + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + res = msg->data.result; + curl_multi_remove_handle(multi_handle, easy_handle); + curl_easy_cleanup(easy_handle); + } + } while (msg); int64_t max_sleep_time = next_marker - time(NULL); - if (max_sleep_time < 0) { + if (max_sleep_time <= 0) { continue; } int fd_count; @@ -424,7 +444,6 @@ class XrdHttpTPC : public XrdHttpExtHandler { } // Harvest any messages, looking for CURLMSG_DONE. - CURLcode res = static_cast(-1); CURLMsg *msg; do { int msgq = 0; @@ -490,6 +509,7 @@ class XrdHttpTPC : public XrdHttpExtHandler { #endif int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) { + m_log.Emsg("ProcessPushReq", "Starting a push request for resource", resource.c_str()); CURL *curl = curl_easy_init(); if (!curl) { char msg[] = "Failed to initialize internal transfer resources"; @@ -544,10 +564,10 @@ class XrdHttpTPC : public XrdHttpExtHandler { } std::string authz = GetAuthz(req); XrdSfsFileOpenMode mode = SFS_O_CREAT; - auto overwrite_header = req.headers.find("Overwrite"); + /*auto overwrite_header = req.headers.find("Overwrite"); if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) { mode = SFS_O_TRUNC|SFS_O_POSC; - } + }*/ int open_result = OpenWaitStall(*fh, req.resource, mode|SFS_O_WRONLY, 0644, req.GetSecEntity(), authz); if (SFS_REDIRECT == open_result) { @@ -705,7 +725,7 @@ class XrdHttpTPC : public XrdHttpExtHandler { return true; } - static constexpr int m_marker_period = 20; + static constexpr int m_marker_period = 5; bool m_desthttps{false}; static std::atomic m_monid; XrdSysError &m_log; @@ -734,7 +754,7 @@ XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, c retval = new XrdHttpTPC(log, config, myEnv); } catch (std::runtime_error &re) { log->Emsg("Initialize", "Encountered a runtime failure when loading ", re.what()); - printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*")); + //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*")); } return retval; }