From f93b5a15720844893de51331a30c9fb521f92ffa Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Thu, 7 Jun 2018 16:05:47 -0500 Subject: [PATCH] Downgrade the XrdTpc code from C++11 to C++03. --- src/XrdTpc/CMakeLists.txt | 2 +- src/XrdTpc/configure.cpp | 30 ++++---- src/XrdTpc/multistream.cpp | 136 +++++++++++++++++++++++++------------ src/XrdTpc/state.cpp | 82 ++++++++++++---------- src/XrdTpc/state.hh | 54 +++++++++++---- src/XrdTpc/stream.cpp | 26 ++++--- src/XrdTpc/stream.hh | 32 ++++++--- src/XrdTpc/tpc.cpp | 85 +++++++++++++---------- src/XrdTpc/tpc.hh | 28 +++++--- 9 files changed, 299 insertions(+), 176 deletions(-) diff --git a/src/XrdTpc/CMakeLists.txt b/src/XrdTpc/CMakeLists.txt index 5c60269dba7..6dfc680a672 100644 --- a/src/XrdTpc/CMakeLists.txt +++ b/src/XrdTpc/CMakeLists.txt @@ -43,7 +43,7 @@ pkg_check_modules(CURL REQUIRED libcurl) include_directories(${XROOTD_INCLUDES} ${XROOTD_PRIVATE_INCLUDES} ${CURL_INCLUDE_DIRS}) -add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp src/multistream.cpp) +add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp src/multistream.cpp src/XrdTpcCurlMulti.cpp) if ( XRD_CHUNK_RESP ) set_target_properties(XrdHttpTPC PROPERTIES COMPILE_DEFINITIONS "XRD_CHUNK_RESP" ) endif () diff --git a/src/XrdTpc/configure.cpp b/src/XrdTpc/configure.cpp index cf9915cdac6..74182f49cda 100644 --- a/src/XrdTpc/configure.cpp +++ b/src/XrdTpc/configure.cpp @@ -18,27 +18,27 @@ using namespace TPC; static XrdSfsFileSystem *load_sfs(void *handle, bool alt, XrdSysError &log, const std::string &libpath, const char *configfn, XrdOucEnv &myEnv, XrdSfsFileSystem *prior_sfs) { - XrdSfsFileSystem *sfs = nullptr; + XrdSfsFileSystem *sfs = NULL; if (alt) { - auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *, XrdOucEnv *)) + XrdSfsFileSystem2_t ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *, XrdOucEnv *)) (dlsym(handle, "XrdSfsGetFileSystem2")); - if (ep == nullptr) { + if (ep == NULL) { log.Emsg("Config", "Failed to load XrdSfsGetFileSystem2 from library ", libpath.c_str(), dlerror()); - return nullptr; + return NULL; } sfs = ep(prior_sfs, log.logger(), configfn, &myEnv); } else { - auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *)) - (dlsym(nullptr, "XrdSfsGetFileSystem")); - if (ep == nullptr) { + XrdSfsFileSystem_t ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *)) + (dlsym(NULL, "XrdSfsGetFileSystem")); + if (ep == NULL) { log.Emsg("Config", "Failed to load XrdSfsGetFileSystem from library ", libpath.c_str(), dlerror()); - return nullptr; + return NULL; } sfs = ep(prior_sfs, log.logger(), configfn); } if (!sfs) { log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", libpath.c_str()); - return nullptr; + return NULL; } return sfs; } @@ -138,10 +138,10 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) } Config.Close(); - XrdSfsFileSystem *base_sfs = nullptr; + XrdSfsFileSystem *base_sfs = NULL; if (path1 == "default") { m_log.Emsg("Config", "Loading the default filesystem"); - base_sfs = XrdSfsGetDefaultFileSystem(nullptr, m_log.logger(), configfn, myEnv); + base_sfs = XrdSfsGetDefaultFileSystem(NULL, m_log.logger(), configfn, myEnv); m_log.Emsg("Config", "Finished loading the default filesystem"); } else { char resolvePath[2048]; @@ -151,17 +151,17 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) return false; } m_handle_base = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW); - if (m_handle_base == nullptr) { + if (m_handle_base == NULL) { m_log.Emsg("Config", "Failed to base plugin ", resolvePath, dlerror()); return false; } - base_sfs = load_sfs(m_handle_base, path1_alt, m_log, path1, configfn, *myEnv, nullptr); + base_sfs = load_sfs(m_handle_base, path1_alt, m_log, path1, configfn, *myEnv, NULL); } if (!base_sfs) { m_log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", path1.c_str()); return false; } - XrdSfsFileSystem *chained_sfs = nullptr; + XrdSfsFileSystem *chained_sfs = NULL; if (!path2.empty()) { char resolvePath[2048]; bool usedAltPath{true}; @@ -170,7 +170,7 @@ bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) return false; } m_handle_chained = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW); - if (m_handle_chained == nullptr) { + if (m_handle_chained == NULL) { m_log.Emsg("Config", "Failed to chained plugin ", resolvePath, dlerror()); return false; } diff --git a/src/XrdTpc/multistream.cpp b/src/XrdTpc/multistream.cpp index 2779d143f19..0afaab30771 100644 --- a/src/XrdTpc/multistream.cpp +++ b/src/XrdTpc/multistream.cpp @@ -6,6 +6,7 @@ #include "tpc.hh" #include "state.hh" +#include "XrdTpcCurlMulti.hh" #include "XrdSys/XrdSysError.hh" @@ -14,6 +15,7 @@ #include #include + using namespace TPC; class CurlHandlerSetupError : public std::runtime_error { @@ -21,35 +23,42 @@ class CurlHandlerSetupError : public std::runtime_error { CurlHandlerSetupError(const std::string &msg) : std::runtime_error(msg) {} - virtual ~CurlHandlerSetupError() {} + + virtual ~CurlHandlerSetupError() throw () {} }; namespace { class MultiCurlHandler { public: - MultiCurlHandler(std::vector &states) : + MultiCurlHandler(std::vector &states) : m_handle(curl_multi_init()), m_states(states) { - if (m_handle == nullptr) { + if (m_handle == NULL) { throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle"); } m_avail_handles.reserve(states.size()); m_active_handles.reserve(states.size()); - for (State &state : states) { - m_avail_handles.push_back(state.GetHandle()); + for (std::vector::const_iterator state_iter = states.begin(); + state_iter != states.end(); + state_iter++) { + m_avail_handles.push_back((*state_iter)->GetHandle()); } } ~MultiCurlHandler() { if (!m_handle) {return;} - for (CURL * easy_handle : m_active_handles) { - curl_multi_remove_handle(m_handle, easy_handle); - curl_easy_cleanup(easy_handle); + for (std::vector::const_iterator it = m_active_handles.begin(); + it != m_active_handles.end(); + it++) { + curl_multi_remove_handle(m_handle, *it); + curl_easy_cleanup(*it); } - for (auto & easy_handle : m_avail_handles) { - curl_easy_cleanup(easy_handle); + for (std::vector::const_iterator it = m_avail_handles.begin(); + it != m_avail_handles.end(); + it++) { + curl_easy_cleanup(*it); } curl_multi_cleanup(m_handle); } @@ -66,13 +75,15 @@ class MultiCurlHandler { << curl_multi_strerror(mres); throw std::runtime_error(ss.str()); } - for (auto &state : m_states) { - if (curl == state.GetHandle()) { - state.ResetAfterRequest(); + for (std::vector::iterator state_iter = m_states.begin(); + state_iter != m_states.end(); + state_iter++) { + if (curl == (*state_iter)->GetHandle()) { + (*state_iter)->ResetAfterRequest(); break; } } - for (auto iter = m_active_handles.begin(); + for (std::vector::iterator iter = m_active_handles.begin(); iter != m_active_handles.end(); ++iter) { @@ -104,11 +115,15 @@ class MultiCurlHandler { bool StartTransfer(off_t offset, size_t size) { if (!CanStartTransfer()) {return false;} - for (auto &handle : m_avail_handles) { - for (auto &state : m_states) { - if (state.GetHandle() == handle) { // This state object represents an idle handle. - state.SetTransferParameters(offset, size); - ActivateHandle(state); + for (std::vector::const_iterator handle_it = m_avail_handles.begin(); + handle_it != m_avail_handles.end(); + handle_it++) { + for (std::vector::iterator state_it = m_states.begin(); + state_it != m_states.end(); + state_it++) { + if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle. + (*state_it)->SetTransferParameters(offset, size); + ActivateHandle(**state_it); return true; } } @@ -141,10 +156,14 @@ class MultiCurlHandler { bool CanStartTransfer() const { size_t idle_handles = m_avail_handles.size(); size_t transfer_in_progress = 0; - for (auto &state : m_states) { - for (const auto &handle : m_active_handles) { - if (handle == state.GetHandle()) { - transfer_in_progress += state.BodyTransferInProgress(); + for (std::vector::const_iterator state_iter = m_states.begin(); + state_iter != m_states.end(); + state_iter++) { + for (std::vector::const_iterator handle_iter = m_active_handles.begin(); + handle_iter != m_active_handles.end(); + handle_iter++) { + if (*handle_iter == (*state_iter)->GetHandle()) { + transfer_in_progress += (*state_iter)->BodyTransferInProgress(); break; } } @@ -152,7 +171,7 @@ class MultiCurlHandler { if (!idle_handles) { return false; } - ssize_t available_buffers = m_states[0].AvailableBuffers(); + ssize_t available_buffers = m_states[0]->AvailableBuffers(); // To be conservative, set aside buffers for any transfers that have been activated // but don't have their first responses back yet. available_buffers -= (m_active_handles.size() - transfer_in_progress); @@ -162,14 +181,14 @@ class MultiCurlHandler { CURLM *m_handle; std::vector m_avail_handles; std::vector m_active_handles; - std::vector &m_states; + std::vector &m_states; }; } -int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, - const char *log_prefix, size_t streams) -try +int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, + const char *log_prefix, size_t streams, + std::vector handles) { int result; bool success; @@ -187,11 +206,11 @@ try } state.ResetAfterRequest(); - std::vector handles; handles.reserve(streams); - handles.emplace_back(std::move(state)); + handles.push_back(new State()); + handles[0]->Move(state); for (size_t idx = 1; idx < streams; idx++) { - handles.emplace_back(handles[0].Duplicate()); // Makes a duplicate of the original state + handles.push_back(handles[0]->Duplicate()); } // Create the multi-handle and add in the current transfer to it. @@ -267,7 +286,13 @@ try continue; } int fd_count; - mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); +#ifdef HAS_CURL_MULTI + mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, + &fd_count); +#else + mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000, + &fd_count); +#endif if (mres != CURLM_OK) { break; } @@ -314,20 +339,45 @@ try if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { return retval; } - return req.ChunkResp(nullptr, 0); + return req.ChunkResp(NULL, 0); } -catch (CurlHandlerSetupError e) { - m_log.Emsg(log_prefix, e.what()); - return req.SendSimpleResp(500, nullptr, nullptr, e.what(), 0); -} catch (std::runtime_error e) { - m_log.Emsg(log_prefix, e.what()); - std::stringstream ss; - ss << "failure: " << e.what(); - int retval; - if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { +int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, + const char *log_prefix, size_t streams) +{ + std::vector handles; + try { + int retval = RunCurlWithStreamsImpl(req, state, log_prefix, streams, handles); + for (std::vector::iterator state_iter = handles.begin(); + state_iter != handles.end(); + state_iter++) { + delete *state_iter; + } return retval; + } catch (CurlHandlerSetupError e) { + for (std::vector::iterator state_iter = handles.begin(); + state_iter != handles.end(); + state_iter++) { + delete *state_iter; + } + + m_log.Emsg(log_prefix, e.what()); + return req.SendSimpleResp(500, NULL, NULL, e.what(), 0); + } catch (std::runtime_error e) { + for (std::vector::iterator state_iter = handles.begin(); + state_iter != handles.end(); + state_iter++) { + delete *state_iter; + } + + m_log.Emsg(log_prefix, e.what()); + std::stringstream ss; + ss << "failure: " << e.what(); + int retval; + if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + return retval; + } + return req.ChunkResp(NULL, 0); } - return req.ChunkResp(nullptr, 0); } #endif // XRD_CHUNK_RESP diff --git a/src/XrdTpc/state.cpp b/src/XrdTpc/state.cpp index c2c234ebf55..6856400b0ad 100644 --- a/src/XrdTpc/state.cpp +++ b/src/XrdTpc/state.cpp @@ -14,28 +14,31 @@ using namespace TPC; + State::~State() { if (m_headers) { curl_slist_free_all(m_headers); - m_headers = nullptr; + m_headers = NULL; if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);} } } -State::State(State && other) noexcept : - m_push(other.m_push), - m_recv_status_line(other.m_recv_status_line), - m_recv_all_headers(other.m_recv_all_headers), - m_offset(other.m_offset), - m_start_offset(other.m_start_offset), - m_status_code(other.m_status_code), - m_content_length(other.m_content_length), - m_stream(other.m_stream), - m_curl(other.m_curl), - m_headers(other.m_headers), - m_headers_copy(std::move(other.m_headers_copy)), - m_resp_protocol(std::move(m_resp_protocol)) + +void State::Move(State &other) { + m_push = other.m_push; + m_recv_status_line = other.m_recv_status_line; + m_recv_all_headers = other.m_recv_all_headers; + m_offset = other.m_offset; + m_start_offset = other.m_start_offset; + m_status_code = other.m_status_code; + m_content_length = other.m_content_length; + m_stream = other.m_stream; + m_curl = other.m_curl; + m_headers = other.m_headers; + m_headers_copy = other.m_headers_copy; + m_resp_protocol = m_resp_protocol; + curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this); if (m_push) { curl_easy_setopt(m_curl, CURLOPT_READDATA, this); @@ -43,8 +46,9 @@ State::State(State && other) noexcept : curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this); } other.m_headers_copy.clear(); - other.m_curl = nullptr; - other.m_headers = nullptr; + other.m_curl = NULL; + other.m_headers = NULL; + other.m_stream = NULL; } bool State::InstallHandlers(CURL *curl) { @@ -56,7 +60,7 @@ bool State::InstallHandlers(CURL *curl) { curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB); curl_easy_setopt(curl, CURLOPT_READDATA, this); struct stat buf; - if (SFS_OK == m_stream.Stat(&buf)) { + if (SFS_OK == m_stream->Stat(&buf)) { curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size); } } else { @@ -76,21 +80,23 @@ bool State::InstallHandlers(CURL *curl) { * Handle the 'Copy-Headers' feature */ void State::CopyHeaders(XrdHttpExtReq &req) { - struct curl_slist *list = nullptr; - for (auto &hdr : req.headers) { - if (hdr.first == "Copy-Header") { - list = curl_slist_append(list, hdr.second.c_str()); - m_headers_copy.emplace_back(hdr.second); + struct curl_slist *list = NULL; + for (std::map::const_iterator hdr_iter = req.headers.begin(); + hdr_iter != req.headers.end(); + hdr_iter++) { + if (hdr_iter->first == "Copy-Header") { + list = curl_slist_append(list, hdr_iter->second.c_str()); + m_headers_copy.emplace_back(hdr_iter->second); } // Note: len("TransferHeader") == 14 - if (!hdr.first.compare(0, 14, "TransferHeader")) { + if (!hdr_iter->first.compare(0, 14, "TransferHeader")) { std::stringstream ss; - ss << hdr.first.substr(14) << ": " << hdr.second; + ss << hdr_iter->first.substr(14) << ": " << hdr_iter->second; list = curl_slist_append(list, ss.str().c_str()); m_headers_copy.emplace_back(ss.str()); } } - if (list != nullptr) { + if (list != NULL) { curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list); m_headers = list; } @@ -170,7 +176,7 @@ size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) } int State::Write(char *buffer, size_t size) { - int retval = m_stream.Write(m_start_offset + m_offset, buffer, size); + int retval = m_stream->Write(m_start_offset + m_offset, buffer, size); if (retval == SFS_ERROR) { return -1; } @@ -186,7 +192,7 @@ size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) { } int State::Read(char *buffer, size_t size) { - int retval = m_stream.Read(m_start_offset + m_offset, buffer, size); + int retval = m_stream->Read(m_start_offset + m_offset, buffer, size); if (retval == SFS_ERROR) { return -1; } @@ -195,25 +201,27 @@ int State::Read(char *buffer, size_t size) { return retval; } -State State::Duplicate() { +State *State::Duplicate() { CURL *curl = curl_easy_duphandle(m_curl); if (!curl) { throw std::runtime_error("Failed to duplicate existing curl handle."); } - State state(0, m_stream, curl, m_push); + State *state = new State(0, *m_stream, curl, m_push); if (m_headers) { - state.m_headers_copy.reserve(m_headers_copy.size()); - for (auto &header : m_headers_copy) { - state.m_headers = curl_slist_append(state.m_headers, header.c_str()); - state.m_headers_copy.push_back(header); + state->m_headers_copy.reserve(m_headers_copy.size()); + for (std::vector::const_iterator header_iter = m_headers_copy.begin(); + header_iter != m_headers_copy.end(); + header_iter++) { + state->m_headers = curl_slist_append(state->m_headers, header_iter->c_str()); + state->m_headers_copy.push_back(*header_iter); } - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state.m_headers); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state->m_headers); } - return std::move(state); + return state; } void State::SetTransferParameters(off_t offset, size_t size) { @@ -227,5 +235,5 @@ void State::SetTransferParameters(off_t offset, size_t size) { int State::AvailableBuffers() const { - return m_stream.AvailableBuffers(); + return m_stream->AvailableBuffers(); } diff --git a/src/XrdTpc/state.hh b/src/XrdTpc/state.hh index 5d52ec39cdd..9ebc5bb91ae 100644 --- a/src/XrdTpc/state.hh +++ b/src/XrdTpc/state.hh @@ -18,14 +18,33 @@ class Stream; class State { public: + State() : + m_push(true), + m_recv_status_line(false), + m_recv_all_headers(false), + m_offset(0), + m_start_offset(0), + m_status_code(-1), + m_content_length(-1), + m_stream(NULL), + m_curl(NULL), + m_headers(NULL) + {} + // Note that we are "borrowing" a reference to the curl handle; // it is not owned / freed by the State object. However, we use it // as if there's only one handle per State. State (off_t start_offset, Stream &stream, CURL *curl, bool push) : m_push(push), + m_recv_status_line(false), + m_recv_all_headers(false), + m_offset(0), m_start_offset(start_offset), - m_stream(stream), - m_curl(curl) + m_status_code(-1), + m_content_length(-1), + m_stream(&stream), + m_curl(curl), + m_headers(NULL) { InstallHandlers(curl); } @@ -54,14 +73,19 @@ public: // Duplicate the current state; all settings are copied over, but those // related to the transient state are reset as if from a constructor. - State Duplicate(); + State *Duplicate(); - State(const State&) = delete; - State(State &&) noexcept; + // Move the contents of a State object. To be replaced by a move + // constructor once C++11 is allowed in XRootD. + void Move (State &other); private: bool InstallHandlers(CURL *curl); + State(const State&); + // Add back once C++11 is available + //State(State &&) noexcept; + // libcurl callback functions, along with the corresponding class methods. static size_t HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata); @@ -71,16 +95,16 @@ private: static size_t ReadCB(void *buffer, size_t size, size_t nitems, void *userdata); int Read(char *buffer, size_t size); - bool m_push{true}; // whether we are transferring in "push-mode" - bool m_recv_status_line{false}; // whether we have received a status line in the response from the remote host. - bool m_recv_all_headers{false}; // true if we have seen the end of headers. - off_t m_offset{0}; // number of bytes we have received. - off_t m_start_offset{0}; // offset where we started in the file. - int m_status_code{-1}; // status code from HTTP response. - off_t m_content_length{-1}; // value of Content-Length header, if we received one. - Stream &m_stream; // stream corresponding to this transfer. - CURL *m_curl{nullptr}; // libcurl handle - struct curl_slist *m_headers{nullptr}; // any headers we set as part of the libcurl request. + bool m_push; // whether we are transferring in "push-mode" + bool m_recv_status_line; // whether we have received a status line in the response from the remote host. + bool m_recv_all_headers; // true if we have seen the end of headers. + off_t m_offset; // number of bytes we have received. + off_t m_start_offset; // offset where we started in the file. + int m_status_code; // status code from HTTP response. + off_t m_content_length; // value of Content-Length header, if we received one. + Stream *m_stream; // stream corresponding to this transfer. + CURL *m_curl; // libcurl handle + struct curl_slist *m_headers; // any headers we set as part of the libcurl request. std::vector m_headers_copy; // Copies of custom headers. std::string m_resp_protocol; // Response protocol in the HTTP status line. }; diff --git a/src/XrdTpc/stream.cpp b/src/XrdTpc/stream.cpp index b2e55304330..af7d65fde56 100644 --- a/src/XrdTpc/stream.cpp +++ b/src/XrdTpc/stream.cpp @@ -7,6 +7,12 @@ using namespace TPC; Stream::~Stream() { + for (std::vector::iterator buffer_iter = m_buffers.begin(); + buffer_iter != m_buffers.end(); + buffer_iter++) { + delete *buffer_iter; + *buffer_iter = NULL; + } m_fh->close(); } @@ -45,18 +51,20 @@ Stream::Write(off_t offset, const char *buf, size_t size) size_t avail_count = 0; do { avail_count = 0; - avail_entry = nullptr; + avail_entry = NULL; buffer_was_written = false; - for (Entry &entry : m_buffers) { + for (std::vector::iterator entry_iter = m_buffers.begin(); + entry_iter != m_buffers.end(); + entry_iter++) { // Always try to dump from memory. - if (entry.Write(*this) > 0) { + if ((*entry_iter)->Write(*this) > 0) { buffer_was_written = true; } - if (entry.Available()) { // Empty buffer - if (!avail_entry) {avail_entry = &entry;} + if ((*entry_iter)->Available()) { // Empty buffer + if (!avail_entry) {avail_entry = *entry_iter;} avail_count ++; } - else if (!buffer_accepted && entry.Accept(offset, buf, size)) { + else if (!buffer_accepted && (*entry_iter)->Accept(offset, buf, size)) { buffer_accepted = true; } } @@ -75,8 +83,10 @@ Stream::Write(off_t offset, const char *buf, size_t size) // If we have low buffer occupancy, then release memory. if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) { - for (Entry &entry : m_buffers) { - entry.ShrinkIfUnused(); + for (std::vector::iterator entry_iter = m_buffers.begin(); + entry_iter != m_buffers.end(); + entry_iter++) { + (*entry_iter)->ShrinkIfUnused(); } } diff --git a/src/XrdTpc/stream.hh b/src/XrdTpc/stream.hh index f123ec42301..afb54293f79 100644 --- a/src/XrdTpc/stream.hh +++ b/src/XrdTpc/stream.hh @@ -23,9 +23,9 @@ public: : m_avail_count(max_blocks), m_fh(std::move(fh)) { - //m_buffers.reserve(max_blocks); + m_buffers.reserve(max_blocks); for (size_t idx=0; idx < max_blocks; idx++) { - m_buffers.emplace_back(buffer_size); + m_buffers.push_back(new Entry(buffer_size)); } } @@ -44,12 +44,11 @@ private: class Entry { public: Entry(size_t capacity) : - m_capacity(capacity) + m_offset(-1), + m_capacity(capacity), + m_size(0) {} - Entry(const Entry&) = delete; - Entry(Entry&&) = default; - bool Available() const {return m_offset == -1;} int Write(Stream &stream) { @@ -91,23 +90,34 @@ private: void ShrinkIfUnused() { if (!Available()) {return;} +#if __cplusplus > 199711L m_buffer.shrink_to_fit(); +#endif + } + + void Move(Entry &other) { + m_buffer.swap(other.m_buffer); + m_offset = other.m_offset; + m_size = other.m_size; } private: + + Entry(const Entry&) = delete; + bool CanWrite(Stream &stream) const { return (m_size > 0) && (m_offset == stream.m_offset); } - off_t m_offset{-1}; // Offset within file that m_buffer[0] represents. - const size_t m_capacity; - size_t m_size{0}; // Number of bytes held in buffer. + off_t m_offset; // Offset within file that m_buffer[0] represents. + size_t m_capacity; + size_t m_size; // Number of bytes held in buffer. std::vector m_buffer; }; size_t m_avail_count; std::unique_ptr m_fh; - off_t m_offset{0}; - std::vector m_buffers; + off_t m_offset; + std::vector m_buffers; }; } diff --git a/src/XrdTpc/tpc.cpp b/src/XrdTpc/tpc.cpp index d83fa8b1967..fde8d1186c3 100644 --- a/src/XrdTpc/tpc.cpp +++ b/src/XrdTpc/tpc.cpp @@ -3,6 +3,7 @@ #include "XrdOuc/XrdOucEnv.hh" #include "XrdSec/XrdSecEntity.hh" #include "XrdSfs/XrdSfsInterface.hh" +#include "XrdSys/XrdSysAtomics.hh" #include "XrdVersion.hh" #include @@ -11,17 +12,21 @@ #include #include -#include #include #include +#include #include "state.hh" #include "stream.hh" #include "tpc.hh" +#include "XrdTpcCurlMulti.hh" using namespace TPC; -std::atomic TPCHandler::m_monid{0}; +uint64_t TPCHandler::m_monid{0}; +int TPCHandler::m_marker_period = 5; +size_t TPCHandler::m_block_size = 16*1024*1024; +XrdSysMutex TPCHandler::m_monid_mutex; XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC); @@ -97,19 +102,22 @@ int TPCHandler::ProcessReq(XrdHttpExtReq &req) { } TPCHandler::~TPCHandler() { - m_sfs = nullptr; // NOTE: must delete the SFS here as we may unload the destructor from memory below! + m_sfs = NULL; // NOTE: must delete the SFS here as we may unload the destructor from memory below! if (m_handle_base) { dlclose(m_handle_base); - m_handle_base = nullptr; + m_handle_base = NULL; } if (m_handle_chained) { dlclose(m_handle_chained); - m_handle_chained = nullptr; + m_handle_chained = NULL; } } TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) : - m_log(*log) + m_desthttps(false), + m_log(*log), + m_handle_base(NULL), + m_handle_chained(NULL) { if (!Configure(config, myEnv)) { throw std::runtime_error("Failed to configure the HTTP third-party-copy handler."); @@ -139,13 +147,13 @@ std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) { int TPCHandler::RedirectTransfer(XrdHttpExtReq &req, XrdOucErrInfo &error) { int port; const char *host = error.getErrText(port); - if ((host == nullptr) || (*host == '\0') || (port == 0)) { + if ((host == NULL) || (*host == '\0') || (port == 0)) { char msg[] = "Internal error: redirect without hostname"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } std::stringstream ss; ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << req.resource; - return req.SendSimpleResp(307, nullptr, const_cast(ss.str().c_str()), nullptr, 0); + return req.SendSimpleResp(307, NULL, const_cast(ss.str().c_str()), NULL, 0); } int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, @@ -155,7 +163,7 @@ int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int open_result; while (1) { open_result = fh.open(resource.c_str(), mode, openMode, &sec, - authz.empty() ? nullptr : authz.c_str()); + authz.empty() ? NULL: authz.c_str()); if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) { int secs_to_stall = fh.error.getErrInfo(); if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;} @@ -179,18 +187,18 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, if (res == CURLE_HTTP_RETURNED_ERROR) { m_log.Emsg("DetermineXferSize", "Remote server failed request", curl_easy_strerror(res)); curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(curl_easy_strerror(res)), 0); + return req.SendSimpleResp(500, NULL, NULL, const_cast(curl_easy_strerror(res)), 0); } else if (state.GetStatusCode() >= 400) { std::stringstream ss; ss << "Remote side failed with status code " << state.GetStatusCode(); m_log.Emsg("DetermineXferSize", "Remote server failed request", ss.str().c_str()); curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(ss.str().c_str()), 0); + return req.SendSimpleResp(500, NULL, NULL, const_cast(ss.str().c_str()), 0); } else if (res) { m_log.Emsg("DetermineXferSize", "Curl failed", curl_easy_strerror(res)); char msg[] = "Unknown internal transfer failure"; curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } curl_easy_setopt(curl, CURLOPT_NOBODY, 0); success = true; @@ -219,7 +227,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, m_log.Emsg(log_prefix, "Failed to initialize a libcurl multi-handle"); char msg[] = "Failed to initialize internal server memory"; curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } CURLMcode mres; @@ -230,7 +238,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, char msg[] = "Failed to initialize internal server handle"; curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } // Start response to client prior to the first call to curl_multi_perform @@ -288,7 +296,11 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, continue; } int fd_count; +#ifdef HAS_CURL_MULTI mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); +#else + mres = curl_multi_wait_impl(multi_handle, max_sleep_time*1000, &fd_count); +#endif if (mres != CURLM_OK) { break; } @@ -302,7 +314,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, curl_easy_cleanup(curl); curl_multi_cleanup(multi_handle); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } // Harvest any messages, looking for CURLMSG_DONE. @@ -324,7 +336,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, curl_multi_cleanup(multi_handle); char msg[] = "Internal state error in libcurl"; m_log.Emsg(log_prefix, msg); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } curl_multi_cleanup(multi_handle); @@ -343,7 +355,7 @@ int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { return retval; } - return req.ChunkResp(nullptr, 0); + return req.ChunkResp(NULL, 0); } #else int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state, @@ -353,21 +365,21 @@ int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state, curl_easy_cleanup(curl); if (res == CURLE_HTTP_RETURNED_ERROR) { m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); - return req.SendSimpleResp(500, nullptr, nullptr, + return req.SendSimpleResp(500, NULL, NULL, const_cast(curl_easy_strerror(res)), 0); } else if (state.GetStatusCode() >= 400) { std::stringstream ss; ss << "Remote side failed with status code " << state.GetStatusCode(); m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); - return req.SendSimpleResp(500, nullptr, nullptr, + return req.SendSimpleResp(500, NULL, NULL, const_cast(ss.str().c_str()), 0); } else if (res) { m_log.Emsg(log_prefix, "Curl failed", curl_easy_strerror(res)); char msg[] = "Unknown internal transfer failure"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } else { char msg[] = "Created"; - return req.SendSimpleResp(201, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(201, NULL, NULL, msg, 0); } } #endif @@ -377,13 +389,16 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) CURL *curl = curl_easy_init(); if (!curl) { char msg[] = "Failed to initialize internal transfer resources"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } char *name = req.GetSecEntity().name; - std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); + AtomicBeg(m_monid_mutex); + uint64_t file_monid = AtomicInc(m_monid); + AtomicEnd(m_monid_mutex); + std::unique_ptr fh(m_sfs->newFile(name, file_monid)); if (!fh.get()) { char msg[] = "Failed to initialize internal transfer file handle"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } std::string authz = GetAuthz(req); @@ -395,10 +410,10 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) int code; char msg_generic[] = "Failed to open local resource"; const char *msg = fh->error.getErrText(code); - if (msg == nullptr) msg = msg_generic; + if (msg == NULL) msg = msg_generic; int status_code = 400; if (code == EACCES) status_code = 401; - int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, + int resp_result = req.SendSimpleResp(status_code, NULL, NULL, const_cast(msg), 0); fh->close(); return resp_result; @@ -423,13 +438,13 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) CURL *curl = curl_easy_init(); if (!curl) { char msg[] = "Failed to initialize internal transfer resources"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } char *name = req.GetSecEntity().name; std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); if (!fh.get()) { char msg[] = "Failed to initialize internal transfer file handle"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } std::string authz = GetAuthz(req); XrdSfsFileOpenMode mode = SFS_O_CREAT; @@ -448,7 +463,7 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) } if (stream_req < 1 || stream_req > 100) { char msg[] = "Invalid request for number of streams"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + return req.SendSimpleResp(500, NULL, NULL, msg, 0); } streams = stream_req; } @@ -462,11 +477,11 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) int code; char msg_generic[] = "Failed to open local resource"; const char *msg = fh->error.getErrText(code); - if ((msg == nullptr) || (*msg == '\0')) msg = msg_generic; + if ((msg == NULL) || (*msg == '\0')) msg = msg_generic; int status_code = 400; if (code == EACCES) status_code = 401; if (code == EEXIST) status_code = 412; - int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, + int resp_result = req.SendSimpleResp(status_code, NULL, NULL, const_cast(msg), 0); fh->close(); return resp_result; @@ -496,13 +511,13 @@ extern "C" { XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) { if (curl_global_init(CURL_GLOBAL_DEFAULT)) { log->Emsg("Initialize", "libcurl failed to initialize"); - return nullptr; + return NULL; } - TPCHandler *retval{nullptr}; + TPCHandler *retval{NULL}; if (!config) { log->Emsg("Initialize", "TPC handler requires a config filename in order to load"); - return nullptr; + return NULL; } try { log->Emsg("Initialize", "Will load configuration for the TPC handler from", config); diff --git a/src/XrdTpc/tpc.hh b/src/XrdTpc/tpc.hh index cfdd9896301..d627401b46b 100644 --- a/src/XrdTpc/tpc.hh +++ b/src/XrdTpc/tpc.hh @@ -1,7 +1,9 @@ -#include #include -#include +#include +#include + +#include "XrdSys/XrdSysPthread.hh" #include "XrdHttp/XrdHttpExtHandler.hh" @@ -19,10 +21,10 @@ public: TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv); virtual ~TPCHandler(); - virtual bool MatchesPath(const char *verb, const char *path) override; - virtual int ProcessReq(XrdHttpExtReq &req) override; + virtual bool MatchesPath(const char *verb, const char *path); + virtual int ProcessReq(XrdHttpExtReq &req); // Abstract method in the base class, but does not seem to be used - virtual int Init(const char *cfgfile) override {return 0;} + virtual int Init(const char *cfgfile) {return 0;} private: int ProcessOptionsReq(XrdHttpExtReq &req); @@ -48,6 +50,9 @@ private: // Experimental multi-stream version of RunCurlWithUpdates int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state, const char *log_prefix, size_t streams); + int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state, + const char *log_prefix, size_t streams, + std::vector streams_handles); #else int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state, const char *log_prefix); @@ -60,14 +65,15 @@ private: std::string &path2, bool &path2_alt); bool Configure(const char *configfn, XrdOucEnv *myEnv); - static constexpr int m_marker_period = 5; - static constexpr size_t m_block_size = 16*1024*1024; - bool m_desthttps{false}; + static int m_marker_period; + static size_t m_block_size; + bool m_desthttps; std::string m_cadir; - static std::atomic m_monid; + static XrdSysMutex m_monid_mutex; + static uint64_t m_monid; XrdSysError &m_log; std::unique_ptr m_sfs; - void *m_handle_base{nullptr}; - void *m_handle_chained{nullptr}; + void *m_handle_base; + void *m_handle_chained; }; }