diff --git a/src/XrdTpc/XrdTpcMultistream.cc b/src/XrdTpc/XrdTpcMultistream.cc index 942bb3e92e6..d4387293ac0 100644 --- a/src/XrdTpc/XrdTpcMultistream.cc +++ b/src/XrdTpc/XrdTpcMultistream.cc @@ -264,7 +264,7 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, time_t now = time(NULL); time_t next_marker = last_marker + m_marker_period; if (now >= next_marker) { - if (SendPerfMarker(req, current_offset)) { + if (SendPerfMarker(req, handles, current_offset)) { return -1; } last_marker = now; diff --git a/src/XrdTpc/XrdTpcState.cc b/src/XrdTpc/XrdTpcState.cc index 6dd858934d3..0dedf243e24 100644 --- a/src/XrdTpc/XrdTpcState.cc +++ b/src/XrdTpc/XrdTpcState.cc @@ -253,3 +253,27 @@ bool State::Finalize() return m_stream->Finalize(); } +std::string State::GetConnectionDescription() +{ + char *curl_ip = NULL; + CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip); + if ((rc != CURLE_OK) || !curl_ip) { + return ""; + } + long curl_port = 0; + rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port); + if ((rc != CURLE_OK) || !curl_port) { + return ""; + } + std::stringstream ss; + // libcurl returns IPv6 addresses of the form: + // 2600:900:6:1301:5054:ff:fe0b:9cba:8000 + // However the HTTP-TPC spec says to use the form + // [2600:900:6:1301:5054:ff:fe0b:9cba:8000] + // Hence, we add '[' and ']' whenever a ':' is seen. + if (NULL == strchr(curl_ip, ':')) + ss << "tcp:" << curl_ip << ":" << curl_port; + else + ss << "tcp:[" << curl_ip << "]:" << curl_port; + return ss.str(); +} diff --git a/src/XrdTpc/XrdTpcState.hh b/src/XrdTpc/XrdTpcState.hh index d710e5150e4..3ad28e53ff2 100644 --- a/src/XrdTpc/XrdTpcState.hh +++ b/src/XrdTpc/XrdTpcState.hh @@ -90,6 +90,12 @@ public: // not all buffers have been reordered by the underlying stream. bool Finalize(); + // Retrieve the description of the remote connection; is of the form: + // tcp:129.93.3.4:1234 + // tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345 + // This is meant to facilitate the monitoring via the performance markers. + std::string GetConnectionDescription(); + private: bool InstallHandlers(CURL *curl); diff --git a/src/XrdTpc/XrdTpcTPC.cc b/src/XrdTpc/XrdTpcTPC.cc index 37385479673..42707876ebd 100644 --- a/src/XrdTpc/XrdTpcTPC.cc +++ b/src/XrdTpc/XrdTpcTPC.cc @@ -238,7 +238,39 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, return 0; } -int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred) { +int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPC::State &state) { + std::stringstream ss; + const std::string crlf = "\n"; + ss << "Perf Marker" << crlf; + ss << "Timestamp: " << time(NULL) << crlf; + ss << "Stripe Index: 0" << crlf; + ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf; + ss << "Total Stripe Count: 1" << crlf; + // Include the TCP connection associated with this transfer; used by + // the TPC client for monitoring purposes. + std::string desc = state.GetConnectionDescription(); + if (!desc.empty()) + ss << "RemoteConnections: " << desc << crlf; + ss << "End" << crlf; + + return req.ChunkResp(ss.str().c_str(), 0); +} + +int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, std::vector &state, + off_t bytes_transferred) +{ + // The 'performance marker' format is largely derived from how GridFTP works + // (e.g., the concept of `Stripe` is not quite so relevant here). See: + // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical + // Example marker: + // Perf Marker\n + // Timestamp: 1537788010\n + // Stripe Index: 0\n + // Stripe Bytes Transferred: 238745\n + // Total Stripe Count: 1\n + // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n + // End\n + // std::stringstream ss; const std::string crlf = "\n"; ss << "Perf Marker" << crlf; @@ -246,6 +278,21 @@ int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred) { ss << "Stripe Index: 0" << crlf; ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf; ss << "Total Stripe Count: 1" << crlf; + // Build a list of TCP connections associated with this transfer; used by + // the TPC client for monitoring purposes. + bool first = true; + std::stringstream ss2; + for (std::vector::const_iterator iter = state.begin(); + iter != state.end(); iter++) + { + std::string desc = (*iter)->GetConnectionDescription(); + if (!desc.empty()) { + ss2 << (first ? "" : ",") << desc; + first = false; + } + } + if (!first) + ss << "RemoteConnections: " << ss2.str() << crlf; ss << "End" << crlf; return req.ChunkResp(ss.str().c_str(), 0); @@ -291,7 +338,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, time_t now = time(NULL); time_t next_marker = last_marker + m_marker_period; if (now >= next_marker) { - if (SendPerfMarker(req, state.BytesTransferred())) { + if (SendPerfMarker(req, state)) { curl_multi_remove_handle(multi_handle, curl); curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); diff --git a/src/XrdTpc/XrdTpcTPC.hh b/src/XrdTpc/XrdTpcTPC.hh index 794c90e21dc..0d4b9db0b7a 100644 --- a/src/XrdTpc/XrdTpcTPC.hh +++ b/src/XrdTpc/XrdTpcTPC.hh @@ -42,7 +42,13 @@ private: int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state, bool &success); - int SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred); + // Send a 'performance marker' back to the TPC client, informing it of our + // progress. The TPC client will use this information to determine whether + // the transfer is making sufficient progress and/or other monitoring info + // (such as whether the transfer is happening over IPv4, IPv6, or both). + int SendPerfMarker(XrdHttpExtReq &req, TPC::State &state); + int SendPerfMarker(XrdHttpExtReq &req, std::vector &state, + off_t bytes_transferred); // Perform the libcurl transfer, periodically sending back chunked updates. int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state,