From 0a9adbc941264e7a492cc943c20e0bcc9abf82d4 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 5 Jan 2018 13:36:06 -0600 Subject: [PATCH] Add support for multi-streamed HTTP transfers. --- CMakeLists.txt | 2 +- src/multistream.cpp | 333 ++++++++++++++++++++++++++++++++++++++++++ src/state.cpp | 75 +++++++++- src/state.hh | 27 +++- src/stream.cpp | 15 +- src/stream.hh | 10 +- src/tpc.cpp | 43 +++--- src/tpc.hh | 8 +- tools/xrootd-test-tpc | 8 + 9 files changed, 489 insertions(+), 32 deletions(-) create mode 100644 src/multistream.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f249a3bb4dd..5c60269dba7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,7 +43,7 @@ pkg_check_modules(CURL REQUIRED libcurl) include_directories(${XROOTD_INCLUDES} ${XROOTD_PRIVATE_INCLUDES} ${CURL_INCLUDE_DIRS}) -add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp) +add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp src/multistream.cpp) if ( XRD_CHUNK_RESP ) set_target_properties(XrdHttpTPC PROPERTIES COMPILE_DEFINITIONS "XRD_CHUNK_RESP" ) endif () diff --git a/src/multistream.cpp b/src/multistream.cpp new file mode 100644 index 00000000000..2779d143f19 --- /dev/null +++ b/src/multistream.cpp @@ -0,0 +1,333 @@ +/** + * Implementation of multi-stream HTTP transfers for the TPCHandler + */ + +#ifdef XRD_CHUNK_RESP + +#include "tpc.hh" +#include "state.hh" + +#include "XrdSys/XrdSysError.hh" + +#include + +#include +#include + +using namespace TPC; + +class CurlHandlerSetupError : public std::runtime_error { +public: + CurlHandlerSetupError(const std::string &msg) : + std::runtime_error(msg) + {} + virtual ~CurlHandlerSetupError() {} +}; + +namespace { +class MultiCurlHandler { +public: + MultiCurlHandler(std::vector &states) : + m_handle(curl_multi_init()), + m_states(states) + { + if (m_handle == nullptr) { + throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle"); + } + m_avail_handles.reserve(states.size()); + m_active_handles.reserve(states.size()); + for (State &state : states) { + m_avail_handles.push_back(state.GetHandle()); + } + } + + ~MultiCurlHandler() + { + if (!m_handle) {return;} + for (CURL * easy_handle : m_active_handles) { + curl_multi_remove_handle(m_handle, easy_handle); + curl_easy_cleanup(easy_handle); + } + for (auto & easy_handle : m_avail_handles) { + curl_easy_cleanup(easy_handle); + } + curl_multi_cleanup(m_handle); + } + + MultiCurlHandler(const MultiCurlHandler &) = delete; + + CURLM *Get() const {return m_handle;} + + void FinishCurlXfer(CURL *curl) { + CURLMcode mres = curl_multi_remove_handle(m_handle, curl); + if (mres) { + std::stringstream ss; + ss << "Failed to remove transfer from set: " + << curl_multi_strerror(mres); + throw std::runtime_error(ss.str()); + } + for (auto &state : m_states) { + if (curl == state.GetHandle()) { + state.ResetAfterRequest(); + break; + } + } + for (auto iter = m_active_handles.begin(); + iter != m_active_handles.end(); + ++iter) + { + if (*iter == curl) { + m_active_handles.erase(iter); + break; + } + } + m_avail_handles.push_back(curl); + } + + off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size, + int &running_handles) { + bool started_new_xfer = false; + do { + size_t xfer_size = std::min(content_length - current_offset, static_cast(block_size)); + if (xfer_size == 0) {return current_offset;} + if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) { + break; + } else { + running_handles += 1; + } + current_offset += xfer_size; + } while (true); + return current_offset; + } + +private: + + bool StartTransfer(off_t offset, size_t size) { + if (!CanStartTransfer()) {return false;} + for (auto &handle : m_avail_handles) { + for (auto &state : m_states) { + if (state.GetHandle() == handle) { // This state object represents an idle handle. + state.SetTransferParameters(offset, size); + ActivateHandle(state); + return true; + } + } + } + return false; + } + + void ActivateHandle(State &state) { + CURL *curl = state.GetHandle(); + m_active_handles.push_back(curl); + CURLMcode mres; + mres = curl_multi_add_handle(m_handle, curl); + if (mres) { + std::stringstream ss; + ss << "Failed to add transfer to libcurl multi-handle" + << curl_multi_strerror(mres); + throw std::runtime_error(ss.str()); + } + for (auto iter = m_avail_handles.begin(); + iter != m_avail_handles.end(); + ++iter) + { + if (*iter == curl) { + m_avail_handles.erase(iter); + break; + } + } + } + + bool CanStartTransfer() const { + size_t idle_handles = m_avail_handles.size(); + size_t transfer_in_progress = 0; + for (auto &state : m_states) { + for (const auto &handle : m_active_handles) { + if (handle == state.GetHandle()) { + transfer_in_progress += state.BodyTransferInProgress(); + break; + } + } + } + if (!idle_handles) { + return false; + } + ssize_t available_buffers = m_states[0].AvailableBuffers(); + // To be conservative, set aside buffers for any transfers that have been activated + // but don't have their first responses back yet. + available_buffers -= (m_active_handles.size() - transfer_in_progress); + return available_buffers > 0; + } + + CURLM *m_handle; + std::vector m_avail_handles; + std::vector m_active_handles; + std::vector &m_states; +}; +} + + +int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, + const char *log_prefix, size_t streams) +try +{ + int result; + bool success; + CURL *curl = state.GetHandle(); + if ((result = DetermineXferSize(curl, req, state, success)) || !success) { + return result; + } + off_t content_size = state.GetContentLength(); + off_t current_offset = 0; + + { + std::stringstream ss; + ss << "Successfully determined remote size for pull request: " << content_size; + m_log.Emsg("ProcessPullReq", ss.str().c_str()); + } + state.ResetAfterRequest(); + + std::vector handles; + handles.reserve(streams); + handles.emplace_back(std::move(state)); + for (size_t idx = 1; idx < streams; idx++) { + handles.emplace_back(handles[0].Duplicate()); // Makes a duplicate of the original state + } + + // Create the multi-handle and add in the current transfer to it. + MultiCurlHandler mch(handles); + CURLM *multi_handle = mch.Get(); + + // Start response to client prior to the first call to curl_multi_perform + int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); + if (retval) { + return retval; + } + + // Start assigning transfers + int running_handles = 0; + current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles); + + // Transfer loop: use curl to actually run the transfer, but periodically + // interrupt things to send back performance updates to the client. + time_t last_marker = 0; + CURLcode res = static_cast(-1); + CURLMcode mres; + do { + time_t now = time(NULL); + time_t next_marker = last_marker + m_marker_period; + if (now >= next_marker) { + if (SendPerfMarker(req, current_offset)) { + return -1; + } + last_marker = now; + } + + mres = curl_multi_perform(multi_handle, &running_handles); + if (mres == CURLM_CALL_MULTI_PERFORM) { + // curl_multi_perform should be called again immediately. On newer + // versions of curl, this is no longer used. + continue; + } else if (mres != CURLM_OK) { + break; + } + + // Harvest any messages, looking for CURLMSG_DONE. + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + mch.FinishCurlXfer(easy_handle); + res = msg->data.result; + // If any requests fail, cut off the entire transfer. + if (res != CURLE_OK) { + break; + } + } + } while (msg); + if (res != -1 && res != CURLE_OK) { + break; + } + + if (running_handles < static_cast(streams)) { + // Issue new transfers if there is still pending work to do. + // Otherwise, continue running until there are no handles left. + if (current_offset != content_size) { + current_offset = mch.StartTransfers(current_offset, content_size, + m_block_size, running_handles); + } else if (running_handles == 0) { + break; + } + } + + int64_t max_sleep_time = next_marker - time(NULL); + if (max_sleep_time <= 0) { + continue; + } + int fd_count; + mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); + if (mres != CURLM_OK) { + break; + } + } while (running_handles); + + if (mres != CURLM_OK) { + std::stringstream ss; + ss << "Internal libcurl multi-handle error: " + << curl_multi_strerror(mres); + throw std::runtime_error(ss.str()); + } + + // Harvest any messages, looking for CURLMSG_DONE. + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + mch.FinishCurlXfer(easy_handle); + res = msg->data.result; // Transfer result will be examined below. + } + } while (msg); + + if (res == -1) { // No transfers returned?!? + throw std::runtime_error("Internal state error in libcurl"); + } + + // Generate the final response back to the client. + std::stringstream ss; + if (res != CURLE_OK) { + m_log.Emsg(log_prefix, "request failed when processing", curl_easy_strerror(res)); + ss << "failure: " << curl_easy_strerror(res); + } else if (current_offset != content_size) { + ss << "failure: Internal logic error led to early abort"; + m_log.Emsg(log_prefix, "Internal logic error led to early abort"); + } else if (state.GetStatusCode() >= 400) { + ss << "failure: Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + } else { + ss << "success: Created"; + } + + if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + return retval; + } + return req.ChunkResp(nullptr, 0); +} +catch (CurlHandlerSetupError e) { + m_log.Emsg(log_prefix, e.what()); + return req.SendSimpleResp(500, nullptr, nullptr, e.what(), 0); +} catch (std::runtime_error e) { + m_log.Emsg(log_prefix, e.what()); + std::stringstream ss; + ss << "failure: " << e.what(); + int retval; + if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + return retval; + } + return req.ChunkResp(nullptr, 0); +} + +#endif // XRD_CHUNK_RESP diff --git a/src/state.cpp b/src/state.cpp index faa6abb67c1..ace8a9eaa12 100644 --- a/src/state.cpp +++ b/src/state.cpp @@ -1,6 +1,7 @@ #include #include +#include #include "XrdHttp/XrdHttpExtHandler.hh" #include "XrdSfs/XrdSfsInterface.hh" @@ -17,10 +18,35 @@ State::~State() { if (m_headers) { curl_slist_free_all(m_headers); m_headers = nullptr; - curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers); + if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);} } } +State::State(State && other) noexcept : + m_push(other.m_push), + m_recv_status_line(other.m_recv_status_line), + m_recv_all_headers(other.m_recv_all_headers), + m_offset(other.m_offset), + m_start_offset(other.m_start_offset), + m_status_code(other.m_status_code), + m_content_length(other.m_content_length), + m_stream(other.m_stream), + m_curl(other.m_curl), + m_headers(other.m_headers), + m_headers_copy(std::move(other.m_headers_copy)), + m_resp_protocol(std::move(m_resp_protocol)) +{ + curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this); + if (m_push) { + curl_easy_setopt(m_curl, CURLOPT_READDATA, this); + } else { + curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this); + } + other.m_headers_copy.clear(); + other.m_curl = nullptr; + other.m_headers = nullptr; +} + bool State::InstallHandlers(CURL *curl) { curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XRDTPC_VERSION); curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB); @@ -45,16 +71,18 @@ bool State::InstallHandlers(CURL *curl) { * Handle the 'Copy-Headers' feature */ void State::CopyHeaders(XrdHttpExtReq &req) { - struct curl_slist *list = NULL; + struct curl_slist *list = nullptr; for (auto &hdr : req.headers) { if (hdr.first == "Copy-Header") { list = curl_slist_append(list, hdr.second.c_str()); + m_headers_copy.emplace_back(hdr.second); } // Note: len("TransferHeader") == 14 if (!hdr.first.compare(0, 14, "TransferHeader")) { std::stringstream ss; ss << hdr.first.substr(14) << ": " << hdr.second; list = curl_slist_append(list, ss.str().c_str()); + m_headers_copy.emplace_back(ss.str()); } } if (list != nullptr) { @@ -63,7 +91,8 @@ void State::CopyHeaders(XrdHttpExtReq &req) { } } -void State::ResetAfterSize() { +void State::ResetAfterRequest() { + m_offset = 0; m_status_code = -1; m_content_length = -1; m_recv_all_headers = false; @@ -128,7 +157,9 @@ int State::Header(const std::string &header) { size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) { State *obj = static_cast(userdata); - if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers. + if (obj->GetStatusCode() < 0) { + return 0; + } // malformed request - got body before headers. if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure. return obj->Write(static_cast(buffer), size*nitems); } @@ -139,7 +170,6 @@ int State::Write(char *buffer, size_t size) { return -1; } m_offset += retval; - //printf("Wrote a total of %ld bytes.\n", m_offset); return retval; } @@ -159,3 +189,38 @@ int State::Read(char *buffer, size_t size) { //printf("Read a total of %ld bytes.\n", m_offset); return retval; } + +State State::Duplicate() { + CURL *curl = curl_easy_duphandle(m_curl); + if (!curl) { + throw std::runtime_error("Failed to duplicate existing curl handle."); + } + + State state(0, m_stream, curl, m_push); + + if (m_headers) { + state.m_headers_copy.reserve(m_headers_copy.size()); + for (auto &header : m_headers_copy) { + state.m_headers = curl_slist_append(state.m_headers, header.c_str()); + state.m_headers_copy.push_back(header); + } + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state.m_headers); + } + + return std::move(state); +} + +void State::SetTransferParameters(off_t offset, size_t size) { + m_start_offset = offset; + m_offset = 0; + m_content_length = size; + std::stringstream ss; + ss << offset << "-" << (offset+size-1); + curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str()); +} + +int State::AvailableBuffers() const +{ + return m_stream.AvailableBuffers(); +} diff --git a/src/state.hh b/src/state.hh index 01f46b88168..5d52ec39cdd 100644 --- a/src/state.hh +++ b/src/state.hh @@ -5,6 +5,7 @@ */ #include +#include // Forward dec'ls class XrdSfsFile; @@ -16,6 +17,10 @@ class Stream; class State { public: + + // Note that we are "borrowing" a reference to the curl handle; + // it is not owned / freed by the State object. However, we use it + // as if there's only one handle per State. State (off_t start_offset, Stream &stream, CURL *curl, bool push) : m_push(push), m_start_offset(start_offset), @@ -27,6 +32,8 @@ public: ~State(); + void SetTransferParameters(off_t offset, size_t size); + void CopyHeaders(XrdHttpExtReq &req); off_t BytesTransferred() const {return m_offset;} @@ -35,7 +42,22 @@ public: int GetStatusCode() const {return m_status_code;} - void ResetAfterSize(); + void ResetAfterRequest(); + + CURL *GetHandle() const {return m_curl;} + + int AvailableBuffers() const; + + // Returns true if at least one byte of the response has been received, + // but not the entire contents of the response. + bool BodyTransferInProgress() const {return m_offset && (m_offset != m_content_length);} + + // Duplicate the current state; all settings are copied over, but those + // related to the transient state are reset as if from a constructor. + State Duplicate(); + + State(const State&) = delete; + State(State &&) noexcept; private: bool InstallHandlers(CURL *curl); @@ -53,12 +75,13 @@ private: bool m_recv_status_line{false}; // whether we have received a status line in the response from the remote host. bool m_recv_all_headers{false}; // true if we have seen the end of headers. off_t m_offset{0}; // number of bytes we have received. - const off_t m_start_offset{0}; // offset where we started in the file. + off_t m_start_offset{0}; // offset where we started in the file. int m_status_code{-1}; // status code from HTTP response. off_t m_content_length{-1}; // value of Content-Length header, if we received one. Stream &m_stream; // stream corresponding to this transfer. CURL *m_curl{nullptr}; // libcurl handle struct curl_slist *m_headers{nullptr}; // any headers we set as part of the libcurl request. + std::vector m_headers_copy; // Copies of custom headers. std::string m_resp_protocol; // Response protocol in the HTTP status line. }; diff --git a/src/stream.cpp b/src/stream.cpp index 6deef39685e..b2e55304330 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -22,6 +22,9 @@ Stream::Write(off_t offset, const char *buf, size_t size) { bool buffer_accepted = false; int retval = size; + if (offset < m_offset) { + return SFS_ERROR; + } if (offset == m_offset) { retval = m_fh->write(offset, buf, size); buffer_accepted = true; @@ -34,14 +37,15 @@ Stream::Write(off_t offset, const char *buf, size_t size) return retval; } } - //printf("Performing stream buffer accounting. Available buffers: %lu, total buffers %lu.\n", m_avail_count, m_buffers.size()); // Even if we already accepted the current data, always // iterate through available buffers and try to write as // much out to disk as possible. - Entry *avail_entry = nullptr; + Entry *avail_entry; bool buffer_was_written; + size_t avail_count = 0; do { - m_avail_count = 0; + avail_count = 0; + avail_entry = nullptr; buffer_was_written = false; for (Entry &entry : m_buffers) { // Always try to dump from memory. @@ -50,13 +54,14 @@ Stream::Write(off_t offset, const char *buf, size_t size) } if (entry.Available()) { // Empty buffer if (!avail_entry) {avail_entry = &entry;} - m_avail_count ++; + avail_count ++; } else if (!buffer_accepted && entry.Accept(offset, buf, size)) { buffer_accepted = true; } } - } while ((m_avail_count != m_buffers.size()) && buffer_was_written); + } while ((avail_count != m_buffers.size()) && buffer_was_written); + m_avail_count = avail_count; if (!buffer_accepted) { // No place for this data in allocated buffers if (!avail_entry) { // No available buffers to allocate. diff --git a/src/stream.hh b/src/stream.hh index 78d6bef028f..f123ec42301 100644 --- a/src/stream.hh +++ b/src/stream.hh @@ -23,7 +23,7 @@ public: : m_avail_count(max_blocks), m_fh(std::move(fh)) { - m_buffers.reserve(max_blocks); + //m_buffers.reserve(max_blocks); for (size_t idx=0; idx < max_blocks; idx++) { m_buffers.emplace_back(buffer_size); } @@ -37,6 +37,8 @@ public: int Write(off_t offset, const char *buffer, size_t size); + size_t AvailableBuffers() const {return m_avail_count;} + private: class Entry { @@ -45,6 +47,9 @@ private: m_capacity(capacity) {} + Entry(const Entry&) = delete; + Entry(Entry&&) = default; + bool Available() const {return m_offset == -1;} int Write(Stream &stream) { @@ -78,6 +83,9 @@ private: // Finally, do the copy. memcpy(&m_buffer[0] + m_size, buf, size); m_size += size; + if (m_offset == -1) { + m_offset = offset; + } return true; } diff --git a/src/tpc.cpp b/src/tpc.cpp index 3c2704cc849..e2b4a92478b 100644 --- a/src/tpc.cpp +++ b/src/tpc.cpp @@ -189,14 +189,13 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, return 0; } -int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, State &state) { +int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred) { std::stringstream ss; const std::string crlf = "\n"; ss << "Perf Marker" << crlf; ss << "Timestamp: " << time(NULL) << crlf; ss << "Stripe Index: 0" << crlf; - ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf; - ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf; + ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf; ss << "Total Stripe Count: 1" << crlf; ss << "End" << crlf; @@ -243,7 +242,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, time_t now = time(NULL); time_t next_marker = last_marker + m_marker_period; if (now >= next_marker) { - if (SendPerfMarker(req, state)) { + if (SendPerfMarker(req, state.BytesTransferred())) { curl_multi_remove_handle(multi_handle, curl); curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); @@ -426,10 +425,26 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) } std::string authz = GetAuthz(req); XrdSfsFileOpenMode mode = SFS_O_CREAT; - /*auto overwrite_header = req.headers.find("Overwrite"); + auto overwrite_header = req.headers.find("Overwrite"); if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) { mode = SFS_O_TRUNC|SFS_O_POSC; - }*/ + } + int streams = 1; + { + auto streams_header = req.headers.find("X-Number-Of-Streams"); + if (streams_header != req.headers.end()) { + int stream_req = -1; + try { + stream_req = std::stol(streams_header->second); + } catch (...) { // Handled below + } + if (stream_req < 1 || stream_req > 100) { + char msg[] = "Invalid request for number of streams"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + streams = stream_req; + } + } int open_result = OpenWaitStall(*fh, req.resource, mode|SFS_O_WRONLY, 0644, req.GetSecEntity(), authz); @@ -452,22 +467,16 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str()); } curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); - Stream stream(std::move(fh), 0, 0); + Stream stream(std::move(fh), streams, m_block_size); State state(0, stream, curl, false); state.CopyHeaders(req); #ifdef XRD_CHUNK_RESP - int result; - bool success; - if ((result = DetermineXferSize(curl, req, state, success)) || !success) { - return result; + if (streams > 1) { + return RunCurlWithStreams(req, state, "ProcessPullReq", streams); + } else { + return RunCurlWithUpdates(curl, req, state, "ProcessPullReq"); } - std::stringstream ss; - ss << "Successfully determined remote size for pull request: " << state.GetContentLength(); - m_log.Emsg("ProcessPullReq", ss.str().c_str()); - state.ResetAfterSize(); - - return RunCurlWithUpdates(curl, req, state, "ProcessPullReq"); #else return RunCurlBasic(curl, req, state, "ProcessPullReq"); #endif diff --git a/src/tpc.hh b/src/tpc.hh index 163ba77c956..cfdd9896301 100644 --- a/src/tpc.hh +++ b/src/tpc.hh @@ -39,10 +39,15 @@ private: int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state, bool &success); - int SendPerfMarker(XrdHttpExtReq &req, TPC::State &state); + int SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred); + // Perform the libcurl transfer, periodically sending back chunked updates. int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state, const char *log_prefix); + + // Experimental multi-stream version of RunCurlWithUpdates + int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state, + const char *log_prefix, size_t streams); #else int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state, const char *log_prefix); @@ -56,6 +61,7 @@ private: bool Configure(const char *configfn, XrdOucEnv *myEnv); static constexpr int m_marker_period = 5; + static constexpr size_t m_block_size = 16*1024*1024; bool m_desthttps{false}; std::string m_cadir; static std::atomic m_monid; diff --git a/tools/xrootd-test-tpc b/tools/xrootd-test-tpc index 851618ab390..2508444a786 100755 --- a/tools/xrootd-test-tpc +++ b/tools/xrootd-test-tpc @@ -14,11 +14,17 @@ def parse_args(): parser.add_argument("--push", dest="mode", action="store_const", const="push", help="Use push-mode for transfer (source manages transfer)", default="auto") parser.add_argument("--pull", dest="mode", action="store_const", const="pull", help="Use pull-mode for transfer (destination manages transfer)") parser.add_argument("--no-overwrite", dest="overwrite", action="store_false", default=True, help="Disable overwrite of existing files.") + parser.add_argument("--streams", dest="streams", help="Allow multiple streams", default=1, + type=int) parser.add_argument("src") parser.add_argument("dest") args = parser.parse_args() + if args.streams < 1: + print >> sys.stderr, "Invalid number of streams specified: %d" % args.streams + sys.exit(1) + if not args.token and (not args.src_token or not args.dest_token): if 'SCITOKEN' in os.environ and os.path.exists(os.environ['SCITOKEN']): args.token = os.environ['SCITOKEN'] @@ -63,6 +69,8 @@ def main(): headers['Overwrite'] = 'T' else: headers['Overwrite'] = 'F' + if args.streams > 1: + headers['X-Number-Of-Streams'] = str(args.streams) mode = args.mode if mode == "auto": mode = determine_mode(args)