From d46c51616e7a26474f977afea911bac3506e7530 Mon Sep 17 00:00:00 2001 From: Michal Simon Date: Tue, 8 May 2018 16:24:37 +0200 Subject: [PATCH] Revert "Merge pull request #679 from bbockelm/xrootd-tpc-rename" This reverts commit e3550ce0af5ee08deae3d4ea660f4f1ed2d52ced, reversing changes made to a768fd61f42d557bf80129eff88202fbe526c7a4. It has been decided that this PR is to be reverted as it does not follow XRootD coding conventions and brakes the build on vanilla RHEL6 platform in presence of libcurl-devel package. There should be a check in the CMake infrastructures that ensures the XrdTpc package will be only built if a proper version of libcurl-devel is present (or XrdTpc should built with version of libcurl-devel provided in raw RHEL6 environment). Moreover the new component should be packaged properly (if neccessary a _with_tpchttp macro should be tested in the spec file). --- cmake/XRootDFindLibs.cmake | 9 - src/CMakeLists.txt | 1 - src/XrdHttp.cmake | 5 +- src/XrdTpc.cmake | 51 ---- src/XrdTpc/CMakeLists.txt | 59 ---- src/XrdTpc/README.md | 121 -------- src/XrdTpc/configure.cpp | 182 ------------ src/XrdTpc/export-lib-symbols | 7 - src/XrdTpc/multistream.cpp | 333 ---------------------- src/XrdTpc/state.cpp | 231 --------------- src/XrdTpc/state.hh | 88 ------ src/XrdTpc/stream.cpp | 90 ------ src/XrdTpc/stream.hh | 113 -------- src/XrdTpc/tpc.cpp | 518 ---------------------------------- src/XrdTpc/tpc.hh | 73 ----- src/XrdTpc/xrootd-test-tpc | 100 ------- 16 files changed, 1 insertion(+), 1980 deletions(-) delete mode 100644 src/XrdTpc.cmake delete mode 100644 src/XrdTpc/CMakeLists.txt delete mode 100644 src/XrdTpc/README.md delete mode 100644 src/XrdTpc/configure.cpp delete mode 100644 src/XrdTpc/export-lib-symbols delete mode 100644 src/XrdTpc/multistream.cpp delete mode 100644 src/XrdTpc/state.cpp delete mode 100644 src/XrdTpc/state.hh delete mode 100644 src/XrdTpc/stream.cpp delete mode 100644 src/XrdTpc/stream.hh delete mode 100644 src/XrdTpc/tpc.cpp delete mode 100644 src/XrdTpc/tpc.hh delete mode 100755 src/XrdTpc/xrootd-test-tpc diff --git a/cmake/XRootDFindLibs.cmake b/cmake/XRootDFindLibs.cmake index d07063aad52..b1e23550446 100644 --- a/cmake/XRootDFindLibs.cmake +++ b/cmake/XRootDFindLibs.cmake @@ -27,9 +27,6 @@ if( SYSTEMD_FOUND ) add_definitions( -DHAVE_SYSTEMD ) endif() -include (FindPkgConfig) -pkg_check_modules(CURL libcurl) - if( ENABLE_CRYPTO ) find_package( OpenSSL ) if( OPENSSL_FOUND ) @@ -73,14 +70,8 @@ endif() if( ENABLE_HTTP ) if( OPENSSL_FOUND AND BUILD_CRYPTO ) set( BUILD_HTTP TRUE ) - if( CURL_FOUND ) - set( BUILD_TPC TRUE ) - else() - set( BUILD_TPC FALSE ) - endif() else() set( BUILD_HTTP FALSE ) - set( BUILD_TPC FALSE ) endif() endif() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index eafa85da0bc..60cb227d377 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,7 +38,6 @@ include( XrdFileCache ) if( BUILD_HTTP ) include( XrdHttp ) - include( XrdTpc ) endif() if( BUILD_CEPH ) diff --git a/src/XrdHttp.cmake b/src/XrdHttp.cmake index 843f1c0bc8c..e9f2befb220 100644 --- a/src/XrdHttp.cmake +++ b/src/XrdHttp.cmake @@ -15,11 +15,9 @@ if( BUILD_HTTP ) #----------------------------------------------------------------------------- include_directories( ${OPENSSL_INCLUDE_DIR} ) - # Note this is marked as a shared library as XrdHttp plugins are expected to - # link against this for the XrdHttpExt class implementations. add_library( ${LIB_XRD_HTTP} - SHARED + MODULE XrdHttp/XrdHttpProtocol.cc XrdHttp/XrdHttpProtocol.hh XrdHttp/XrdHttpReq.cc XrdHttp/XrdHttpReq.hh XrdHttp/XrdHttpSecXtractor.hh @@ -42,7 +40,6 @@ if( BUILD_HTTP ) ${LIB_XRD_HTTP} PROPERTIES INTERFACE_LINK_LIBRARIES "" - SUFFIX ".so" LINK_INTERFACE_LIBRARIES "" ) #----------------------------------------------------------------------------- diff --git a/src/XrdTpc.cmake b/src/XrdTpc.cmake deleted file mode 100644 index 0b83958a0a2..00000000000 --- a/src/XrdTpc.cmake +++ /dev/null @@ -1,51 +0,0 @@ -include( XRootDCommon ) - -#------------------------------------------------------------------------------- -# Modules -#------------------------------------------------------------------------------- -set( LIB_XRD_TPC XrdHttpTPC-${PLUGIN_VERSION} ) - -#------------------------------------------------------------------------------- -# Shared library version -#------------------------------------------------------------------------------- - -if( BUILD_TPC ) - #----------------------------------------------------------------------------- - # The XrdHttp library - #----------------------------------------------------------------------------- - include_directories( ${CURL_INCLUDE_DIRS} ) - - add_library( - ${LIB_XRD_TPC} - MODULE - XrdTpc/configure.cpp - XrdTpc/multistream.cpp - XrdTpc/state.cpp XrdTpc/state.hh - XrdTpc/stream.cpp XrdTpc/stream.hh - XrdTpc/tpc.cpp XrdTpc/tpc.hh) - - target_link_libraries( - ${LIB_XRD_TPC} - XrdServer - XrdUtils - XrdHttp-${PLUGIN_VERSION} - dl - pthread - ${CURL_LIBRARIES} ) - - set_target_properties( - ${LIB_XRD_TPC} - PROPERTIES - INTERFACE_LINK_LIBRARIES "" - LINK_INTERFACE_LIBRARIES "" - LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/src/XrdTpc/export-lib-symbols" - COMPILE_DEFINITIONS "XRD_CHUNK_RESP") - - #----------------------------------------------------------------------------- - # Install - #----------------------------------------------------------------------------- - install( - TARGETS ${LIB_XRD_TPC} - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ) - -endif() diff --git a/src/XrdTpc/CMakeLists.txt b/src/XrdTpc/CMakeLists.txt deleted file mode 100644 index 5c60269dba7..00000000000 --- a/src/XrdTpc/CMakeLists.txt +++ /dev/null @@ -1,59 +0,0 @@ - -cmake_minimum_required( VERSION 2.8 ) -project( xrootd-tpc ) - -set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake ) - -find_package( Xrootd REQUIRED ) - -macro(use_cxx11) - if (CMAKE_VERSION VERSION_LESS "3.1") - if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") - set (CMAKE_CXX_FLAGS "-std=gnu++11 ${CMAKE_CXX_FLAGS}") - endif () - else () - set (CMAKE_CXX_STANDARD 11) - endif () -endmacro(use_cxx11) -use_cxx11() - -# Chunked responses were introduced after Xrootd 4.8.0. Check to see if the symbol exists. -#include (CheckCXXSymbolExists) -SET( CMAKE_REQUIRED_INCLUDES "${XROOTD_PRIVATE_INCLUDES}" ) -SET( CMAKE_REQUIRED_LIBRARIES "${XROOTD_HTTP_LIB}" ) -SET( CMAKE_REQUIRED_FLAGS "" ) -include (CheckCXXSourceCompiles) -CHECK_CXX_SOURCE_COMPILES("#include - -int main(int argc, char** argv) -{ - (void)argv; - return ((int*)reinterpret_cast(&XrdHttpExtReq::ChunkResp))[argc]; -} -" XRD_CHUNK_RESP) - -if( CMAKE_COMPILER_IS_GNUCXX ) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Werror" ) -endif() -SET( CMAKE_SHARED_LINKER_FLAGS "-Wl,--no-undefined") -SET( CMAKE_MODULE_LINKER_FLAGS "-Wl,--no-undefined") - -include (FindPkgConfig) -pkg_check_modules(CURL REQUIRED libcurl) - -include_directories(${XROOTD_INCLUDES} ${XROOTD_PRIVATE_INCLUDES} ${CURL_INCLUDE_DIRS}) - -add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp src/multistream.cpp) -if ( XRD_CHUNK_RESP ) - set_target_properties(XrdHttpTPC PROPERTIES COMPILE_DEFINITIONS "XRD_CHUNK_RESP" ) -endif () - -target_link_libraries(XrdHttpTPC -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${XROOTD_HTTP_LIB} ${CURL_LIBRARIES}) -set_target_properties(XrdHttpTPC PROPERTIES OUTPUT_NAME "XrdHttpTPC-4" SUFFIX ".so" LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/configs/export-lib-symbols") - -SET(LIB_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/lib" CACHE PATH "Install path for libraries") - -install( - TARGETS XrdHttpTPC - LIBRARY DESTINATION ${LIB_INSTALL_DIR} -) diff --git a/src/XrdTpc/README.md b/src/XrdTpc/README.md deleted file mode 100644 index a0845d23646..00000000000 --- a/src/XrdTpc/README.md +++ /dev/null @@ -1,121 +0,0 @@ -# HTTPS Third Party Copy for XRootD - -The `xrootd-tpc` module provides an implementation of HTTPS third-party-copy -for the HTTPS implementation inside XRootD. - -To enable, set the following in the configuration file: - -``` -http.exthandler xrdtpc libXrdHttpTPC.so -``` - - -## HTTPS TPC technical details. - -Third-party-copy is done using the standard WebDAV copy verb. The actors involved are: - -1. The client orchestrating the transfer. -2. The source server where the resource is read from. -3. The destination server where the resource is written to. - -The client initiates the third party copy by issuing a COPY request to _either_ the source or destination -endpoint. Typically, when this is done, the initial endpoint redirects the client to a second service -(the disk server) that will performs the actual transfer. - -When the client contacts the source server, it should set the `Destination` header so the source knows -where to put the data. Analogously, if the client first contacts the destination server, it should set -the `Source` header. - -For the former case, the interaction looks like this: - -``` --> COPY /sfn/of/source/replica HTTP/1.1 - Destination: https:///pfn/of/dest/replica -<- HTTP/1.1 302 Found - Location: https:///pfn/of/source/replica -``` - -where `` is the service that will actually perform the transfer. - -The client would follow the redirection and retry the `COPY`: - -``` --> COPY /sfn/of/source/replica HTTP/1.1 - Destination: https:///pfn/of/dest/replica -<- HTTP/1.1 201 Created - Transfer-Encoding: chunked -``` - -The source server *should* respond with an appropriate status code (such as 201) in a timely manner. -As a TPC can take a significant amount of time, the source server SHOULD NOT wait until the transfer is -finished before responding with a success. In the case when a transfer can be started, it is recommended -that the response be started as soon as possible. - -In the case of a transfer being started (or queued) by the source server, it SHOULD use chunked encoding -and send a multipart response. - -Next `` will connect to the destination endpoint and actually perform the `PUT`: - -``` --> PUT /pfn/of/destination/replica HTTP/1.1 -<- HTTP/1.1 201 Created -``` - -As the PUT is ongoing, the source disk server should send back a periodic transfer chunk of the following -form: - -``` -Perf Marker - Timestamp: 1360578938 - Stripe Index: 0 - Stripe Bytes Transferred: 49397760 - Total Stripe Count: 1 -End -``` - -Timestamps should be seconds from Unix epoch. It is recommended that the time period between chunks be -less than 30 seconds. - -If the transfer ultimately succeeds, then the last chunk should be of the following form: - -``` -success: Created -``` - -Here, `success` indicates the transfer status: subsequent text is informational for the user. - -Failure of the transfer can be indicated with the prefix `failed` or `failure`: - -``` -failure: Network connection unexpectedly closed. -``` - -Finally, if the source disk server decides to cancel or abort the transfer, use the `aborted` prefix: - -``` -aborted: Transfer took too long. -``` - -There is an analogous case when the client contacts the destination server to perform the `COPY` and -sets the `Source` header. In such a case, the response to the client looks identical but the destination -server will perform a `GET` instead of a `PUT`. - -In some cases, the user may want the server performing the transfer to append additional headers -(such as an HTTP `Authorization` header) to the transfer it initiates. In such case, the client should -utilize the `TransferHeader` mechanism. Add a header of the form: - -``` -TransferHeader
: -``` - -For example, if the client sends the following to the source server as part of its `COPY` request: - -``` -TransferHeaderAuthorization: Basic cGF1bDpwYXNzd29yZA== -``` - -then the source server should set the following header as part of its `PUT` request to the destination: - -``` -Authorization: Basic cGF1bDpwYXNzd29yZA== -``` diff --git a/src/XrdTpc/configure.cpp b/src/XrdTpc/configure.cpp deleted file mode 100644 index cf9915cdac6..00000000000 --- a/src/XrdTpc/configure.cpp +++ /dev/null @@ -1,182 +0,0 @@ - -#include "tpc.hh" - -#include -#include - -#include "XrdOuc/XrdOucStream.hh" -#include "XrdOuc/XrdOucPinPath.hh" -#include "XrdSfs/XrdSfsInterface.hh" - -extern XrdSfsFileSystem *XrdSfsGetDefaultFileSystem(XrdSfsFileSystem *native_fs, - XrdSysLogger *lp, - const char *configfn, - XrdOucEnv *EnvInfo); - - -using namespace TPC; - - -static XrdSfsFileSystem *load_sfs(void *handle, bool alt, XrdSysError &log, const std::string &libpath, const char *configfn, XrdOucEnv &myEnv, XrdSfsFileSystem *prior_sfs) { - XrdSfsFileSystem *sfs = nullptr; - if (alt) { - auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *, XrdOucEnv *)) - (dlsym(handle, "XrdSfsGetFileSystem2")); - if (ep == nullptr) { - log.Emsg("Config", "Failed to load XrdSfsGetFileSystem2 from library ", libpath.c_str(), dlerror()); - return nullptr; - } - sfs = ep(prior_sfs, log.logger(), configfn, &myEnv); - } else { - auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *)) - (dlsym(nullptr, "XrdSfsGetFileSystem")); - if (ep == nullptr) { - log.Emsg("Config", "Failed to load XrdSfsGetFileSystem from library ", libpath.c_str(), dlerror()); - return nullptr; - } - sfs = ep(prior_sfs, log.logger(), configfn); - } - if (!sfs) { - log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", libpath.c_str()); - return nullptr; - } - return sfs; -} - - -bool TPCHandler::ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, std::string &path2, bool &path2_alt) { - char *val; - if (!(val = Config.GetWord())) { - m_log.Emsg("Config", "fslib not specified"); - return false; - } - if (!strcmp("throttle", val)) { - path2 = "libXrdThrottle.so"; - if (!(val = Config.GetWord())) { - m_log.Emsg("Config", "fslib throttle target library not specified"); - return false; - } - } - else if (!strcmp("-2", val)) { - path2_alt = true; - if (!(val = Config.GetWord())) { - m_log.Emsg("Config", "fslib library not specified"); - return false; - } - path2 = val; - } - else { - path2 = val; - } - if (!(val = Config.GetWord()) || !strcmp("default", val)) { - if (path2 == "libXrdThrottle.so") { - path1 = "default"; - } else if (!path2.empty()) { - path1 = path2; - path2 = ""; - path1_alt = path2_alt; - } - } else if (!strcmp("-2", val)) { - path1_alt = true; - if (!(val = Config.GetWord())) { - m_log.Emsg("Config", "fslib base library not specified"); - return false; - } - path1 = val; - } else { - path2 = val; - } - return true; -} - -bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) -{ - XrdOucStream Config(&m_log, getenv("XRDINSTANCE"), myEnv, "=====> "); - - std::string authLib; - std::string authLibParms; - int cfgFD = open(configfn, O_RDONLY, 0); - if (cfgFD < 0) { - m_log.Emsg("Config", errno, "open config file", configfn); - return false; - } - Config.Attach(cfgFD); - const char *val; - std::string path2, path1 = "default"; - bool path1_alt = false, path2_alt = false; - while ((val = Config.GetMyFirstWord())) { - if (!strcmp("xrootd.fslib", val)) { - if (!ConfigureFSLib(Config, path1, path1_alt, path2, path2_alt)) { - Config.Close(); - m_log.Emsg("Config", "Failed to parse the xrootd.fslib directive"); - return false; - } - m_log.Emsg("Config", "xrootd.fslib line successfully processed by TPC handler/"); - } else if (!strcmp("http.desthttps", val)) { - if (!(val = Config.GetWord())) { - Config.Close(); - m_log.Emsg("Config", "http.desthttps value not specified"); - return false; - } - if (!strcmp("1", val) || !strcasecmp("yes", val) || !strcasecmp("true", val)) { - m_desthttps = true; - } else if (!strcmp("0", val) || !strcasecmp("no", val) || !strcasecmp("false", val)) { - m_desthttps = false; - } else { - Config.Close(); - m_log.Emsg("Config", "https.desthttps value is invalid", val); - return false; - } - } else if (!strcmp("http.cadir", val)) { - if (!(val = Config.GetWord())) { - Config.Close(); - m_log.Emsg("Config", "http.cadir value not specified"); - return false; - } - m_cadir = val; - } - } - Config.Close(); - - XrdSfsFileSystem *base_sfs = nullptr; - if (path1 == "default") { - m_log.Emsg("Config", "Loading the default filesystem"); - base_sfs = XrdSfsGetDefaultFileSystem(nullptr, m_log.logger(), configfn, myEnv); - m_log.Emsg("Config", "Finished loading the default filesystem"); - } else { - char resolvePath[2048]; - bool usedAltPath{true}; - if (!XrdOucPinPath(path1.c_str(), usedAltPath, resolvePath, 2048)) { - m_log.Emsg("Config", "Failed to locate appropriately versioned base filesystem library for ", path1.c_str()); - return false; - } - m_handle_base = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW); - if (m_handle_base == nullptr) { - m_log.Emsg("Config", "Failed to base plugin ", resolvePath, dlerror()); - return false; - } - base_sfs = load_sfs(m_handle_base, path1_alt, m_log, path1, configfn, *myEnv, nullptr); - } - if (!base_sfs) { - m_log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", path1.c_str()); - return false; - } - XrdSfsFileSystem *chained_sfs = nullptr; - if (!path2.empty()) { - char resolvePath[2048]; - bool usedAltPath{true}; - if (!XrdOucPinPath(path2.c_str(), usedAltPath, resolvePath, 2048)) { - m_log.Emsg("Config", "Failed to locate appropriately versioned chained filesystem library for ", path2.c_str()); - return false; - } - m_handle_chained = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW); - if (m_handle_chained == nullptr) { - m_log.Emsg("Config", "Failed to chained plugin ", resolvePath, dlerror()); - return false; - } - chained_sfs = load_sfs(m_handle_chained, path2_alt, m_log, path2, configfn, *myEnv, base_sfs); - } - m_sfs.reset(chained_sfs ? chained_sfs : base_sfs); - m_log.Emsg("Config", "Successfully configured the filesystem object for TPC handler"); - return true; -} diff --git a/src/XrdTpc/export-lib-symbols b/src/XrdTpc/export-lib-symbols deleted file mode 100644 index 4e6f0d221d8..00000000000 --- a/src/XrdTpc/export-lib-symbols +++ /dev/null @@ -1,7 +0,0 @@ -{ -global: - XrdHttpGetExtHandler*; - -local: - *; -}; diff --git a/src/XrdTpc/multistream.cpp b/src/XrdTpc/multistream.cpp deleted file mode 100644 index 2779d143f19..00000000000 --- a/src/XrdTpc/multistream.cpp +++ /dev/null @@ -1,333 +0,0 @@ -/** - * Implementation of multi-stream HTTP transfers for the TPCHandler - */ - -#ifdef XRD_CHUNK_RESP - -#include "tpc.hh" -#include "state.hh" - -#include "XrdSys/XrdSysError.hh" - -#include - -#include -#include - -using namespace TPC; - -class CurlHandlerSetupError : public std::runtime_error { -public: - CurlHandlerSetupError(const std::string &msg) : - std::runtime_error(msg) - {} - virtual ~CurlHandlerSetupError() {} -}; - -namespace { -class MultiCurlHandler { -public: - MultiCurlHandler(std::vector &states) : - m_handle(curl_multi_init()), - m_states(states) - { - if (m_handle == nullptr) { - throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle"); - } - m_avail_handles.reserve(states.size()); - m_active_handles.reserve(states.size()); - for (State &state : states) { - m_avail_handles.push_back(state.GetHandle()); - } - } - - ~MultiCurlHandler() - { - if (!m_handle) {return;} - for (CURL * easy_handle : m_active_handles) { - curl_multi_remove_handle(m_handle, easy_handle); - curl_easy_cleanup(easy_handle); - } - for (auto & easy_handle : m_avail_handles) { - curl_easy_cleanup(easy_handle); - } - curl_multi_cleanup(m_handle); - } - - MultiCurlHandler(const MultiCurlHandler &) = delete; - - CURLM *Get() const {return m_handle;} - - void FinishCurlXfer(CURL *curl) { - CURLMcode mres = curl_multi_remove_handle(m_handle, curl); - if (mres) { - std::stringstream ss; - ss << "Failed to remove transfer from set: " - << curl_multi_strerror(mres); - throw std::runtime_error(ss.str()); - } - for (auto &state : m_states) { - if (curl == state.GetHandle()) { - state.ResetAfterRequest(); - break; - } - } - for (auto iter = m_active_handles.begin(); - iter != m_active_handles.end(); - ++iter) - { - if (*iter == curl) { - m_active_handles.erase(iter); - break; - } - } - m_avail_handles.push_back(curl); - } - - off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size, - int &running_handles) { - bool started_new_xfer = false; - do { - size_t xfer_size = std::min(content_length - current_offset, static_cast(block_size)); - if (xfer_size == 0) {return current_offset;} - if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) { - break; - } else { - running_handles += 1; - } - current_offset += xfer_size; - } while (true); - return current_offset; - } - -private: - - bool StartTransfer(off_t offset, size_t size) { - if (!CanStartTransfer()) {return false;} - for (auto &handle : m_avail_handles) { - for (auto &state : m_states) { - if (state.GetHandle() == handle) { // This state object represents an idle handle. - state.SetTransferParameters(offset, size); - ActivateHandle(state); - return true; - } - } - } - return false; - } - - void ActivateHandle(State &state) { - CURL *curl = state.GetHandle(); - m_active_handles.push_back(curl); - CURLMcode mres; - mres = curl_multi_add_handle(m_handle, curl); - if (mres) { - std::stringstream ss; - ss << "Failed to add transfer to libcurl multi-handle" - << curl_multi_strerror(mres); - throw std::runtime_error(ss.str()); - } - for (auto iter = m_avail_handles.begin(); - iter != m_avail_handles.end(); - ++iter) - { - if (*iter == curl) { - m_avail_handles.erase(iter); - break; - } - } - } - - bool CanStartTransfer() const { - size_t idle_handles = m_avail_handles.size(); - size_t transfer_in_progress = 0; - for (auto &state : m_states) { - for (const auto &handle : m_active_handles) { - if (handle == state.GetHandle()) { - transfer_in_progress += state.BodyTransferInProgress(); - break; - } - } - } - if (!idle_handles) { - return false; - } - ssize_t available_buffers = m_states[0].AvailableBuffers(); - // To be conservative, set aside buffers for any transfers that have been activated - // but don't have their first responses back yet. - available_buffers -= (m_active_handles.size() - transfer_in_progress); - return available_buffers > 0; - } - - CURLM *m_handle; - std::vector m_avail_handles; - std::vector m_active_handles; - std::vector &m_states; -}; -} - - -int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, - const char *log_prefix, size_t streams) -try -{ - int result; - bool success; - CURL *curl = state.GetHandle(); - if ((result = DetermineXferSize(curl, req, state, success)) || !success) { - return result; - } - off_t content_size = state.GetContentLength(); - off_t current_offset = 0; - - { - std::stringstream ss; - ss << "Successfully determined remote size for pull request: " << content_size; - m_log.Emsg("ProcessPullReq", ss.str().c_str()); - } - state.ResetAfterRequest(); - - std::vector handles; - handles.reserve(streams); - handles.emplace_back(std::move(state)); - for (size_t idx = 1; idx < streams; idx++) { - handles.emplace_back(handles[0].Duplicate()); // Makes a duplicate of the original state - } - - // Create the multi-handle and add in the current transfer to it. - MultiCurlHandler mch(handles); - CURLM *multi_handle = mch.Get(); - - // Start response to client prior to the first call to curl_multi_perform - int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); - if (retval) { - return retval; - } - - // Start assigning transfers - int running_handles = 0; - current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles); - - // Transfer loop: use curl to actually run the transfer, but periodically - // interrupt things to send back performance updates to the client. - time_t last_marker = 0; - CURLcode res = static_cast(-1); - CURLMcode mres; - do { - time_t now = time(NULL); - time_t next_marker = last_marker + m_marker_period; - if (now >= next_marker) { - if (SendPerfMarker(req, current_offset)) { - return -1; - } - last_marker = now; - } - - mres = curl_multi_perform(multi_handle, &running_handles); - if (mres == CURLM_CALL_MULTI_PERFORM) { - // curl_multi_perform should be called again immediately. On newer - // versions of curl, this is no longer used. - continue; - } else if (mres != CURLM_OK) { - break; - } - - // Harvest any messages, looking for CURLMSG_DONE. - CURLMsg *msg; - do { - int msgq = 0; - msg = curl_multi_info_read(multi_handle, &msgq); - if (msg && (msg->msg == CURLMSG_DONE)) { - CURL *easy_handle = msg->easy_handle; - mch.FinishCurlXfer(easy_handle); - res = msg->data.result; - // If any requests fail, cut off the entire transfer. - if (res != CURLE_OK) { - break; - } - } - } while (msg); - if (res != -1 && res != CURLE_OK) { - break; - } - - if (running_handles < static_cast(streams)) { - // Issue new transfers if there is still pending work to do. - // Otherwise, continue running until there are no handles left. - if (current_offset != content_size) { - current_offset = mch.StartTransfers(current_offset, content_size, - m_block_size, running_handles); - } else if (running_handles == 0) { - break; - } - } - - int64_t max_sleep_time = next_marker - time(NULL); - if (max_sleep_time <= 0) { - continue; - } - int fd_count; - mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); - if (mres != CURLM_OK) { - break; - } - } while (running_handles); - - if (mres != CURLM_OK) { - std::stringstream ss; - ss << "Internal libcurl multi-handle error: " - << curl_multi_strerror(mres); - throw std::runtime_error(ss.str()); - } - - // Harvest any messages, looking for CURLMSG_DONE. - CURLMsg *msg; - do { - int msgq = 0; - msg = curl_multi_info_read(multi_handle, &msgq); - if (msg && (msg->msg == CURLMSG_DONE)) { - CURL *easy_handle = msg->easy_handle; - mch.FinishCurlXfer(easy_handle); - res = msg->data.result; // Transfer result will be examined below. - } - } while (msg); - - if (res == -1) { // No transfers returned?!? - throw std::runtime_error("Internal state error in libcurl"); - } - - // Generate the final response back to the client. - std::stringstream ss; - if (res != CURLE_OK) { - m_log.Emsg(log_prefix, "request failed when processing", curl_easy_strerror(res)); - ss << "failure: " << curl_easy_strerror(res); - } else if (current_offset != content_size) { - ss << "failure: Internal logic error led to early abort"; - m_log.Emsg(log_prefix, "Internal logic error led to early abort"); - } else if (state.GetStatusCode() >= 400) { - ss << "failure: Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); - } else { - ss << "success: Created"; - } - - if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { - return retval; - } - return req.ChunkResp(nullptr, 0); -} -catch (CurlHandlerSetupError e) { - m_log.Emsg(log_prefix, e.what()); - return req.SendSimpleResp(500, nullptr, nullptr, e.what(), 0); -} catch (std::runtime_error e) { - m_log.Emsg(log_prefix, e.what()); - std::stringstream ss; - ss << "failure: " << e.what(); - int retval; - if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { - return retval; - } - return req.ChunkResp(nullptr, 0); -} - -#endif // XRD_CHUNK_RESP diff --git a/src/XrdTpc/state.cpp b/src/XrdTpc/state.cpp deleted file mode 100644 index c2c234ebf55..00000000000 --- a/src/XrdTpc/state.cpp +++ /dev/null @@ -1,231 +0,0 @@ - -#include -#include -#include - -#include "XrdVersion.hh" -#include "XrdHttp/XrdHttpExtHandler.hh" -#include "XrdSfs/XrdSfsInterface.hh" - -#include - -#include "state.hh" -#include "stream.hh" - -using namespace TPC; - -State::~State() { - if (m_headers) { - curl_slist_free_all(m_headers); - m_headers = nullptr; - if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);} - } -} - -State::State(State && other) noexcept : - m_push(other.m_push), - m_recv_status_line(other.m_recv_status_line), - m_recv_all_headers(other.m_recv_all_headers), - m_offset(other.m_offset), - m_start_offset(other.m_start_offset), - m_status_code(other.m_status_code), - m_content_length(other.m_content_length), - m_stream(other.m_stream), - m_curl(other.m_curl), - m_headers(other.m_headers), - m_headers_copy(std::move(other.m_headers_copy)), - m_resp_protocol(std::move(m_resp_protocol)) -{ - curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this); - if (m_push) { - curl_easy_setopt(m_curl, CURLOPT_READDATA, this); - } else { - curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this); - } - other.m_headers_copy.clear(); - other.m_curl = nullptr; - other.m_headers = nullptr; -} - -bool State::InstallHandlers(CURL *curl) { - curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB); - curl_easy_setopt(curl, CURLOPT_HEADERDATA, this); - if (m_push) { - curl_easy_setopt(curl, CURLOPT_UPLOAD, 1); - curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB); - curl_easy_setopt(curl, CURLOPT_READDATA, this); - struct stat buf; - if (SFS_OK == m_stream.Stat(&buf)) { - curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size); - } - } else { - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); - } - curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); - - // Require a minimum speed from the transfer: must move at least 1MB every 2 minutes - // (roughly 8KB/s). - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60); - curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1024*1024); - return true; -} - -/** - * Handle the 'Copy-Headers' feature - */ -void State::CopyHeaders(XrdHttpExtReq &req) { - struct curl_slist *list = nullptr; - for (auto &hdr : req.headers) { - if (hdr.first == "Copy-Header") { - list = curl_slist_append(list, hdr.second.c_str()); - m_headers_copy.emplace_back(hdr.second); - } - // Note: len("TransferHeader") == 14 - if (!hdr.first.compare(0, 14, "TransferHeader")) { - std::stringstream ss; - ss << hdr.first.substr(14) << ": " << hdr.second; - list = curl_slist_append(list, ss.str().c_str()); - m_headers_copy.emplace_back(ss.str()); - } - } - if (list != nullptr) { - curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list); - m_headers = list; - } -} - -void State::ResetAfterRequest() { - m_offset = 0; - m_status_code = -1; - m_content_length = -1; - m_recv_all_headers = false; - m_recv_status_line = false; -} - -size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata) -{ - State *obj = static_cast(userdata); - std::string header(buffer, size*nitems); - return obj->Header(header); -} - -int State::Header(const std::string &header) { - //printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str()); - if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect? - m_recv_all_headers = false; - m_recv_status_line = false; - } - if (!m_recv_status_line) { - std::stringstream ss(header); - std::string item; - if (!std::getline(ss, item, ' ')) return 0; - m_resp_protocol = item; - //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str()); - if (!std::getline(ss, item, ' ')) return 0; - try { - m_status_code = std::stol(item); - } catch (...) { - return 0; - } - m_recv_status_line = true; - } else if (header.size() == 0 || header == "\n") { - m_recv_all_headers = true; - } - else if (header != "\r\n") { - // Parse the header - std::size_t found = header.find(":"); - if (found != std::string::npos) { - std::string header_name = header.substr(0, found); - std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower); - std::string header_value = header.substr(found+1); - if (header_name == "content-length") - { - try { - m_content_length = std::stoll(header_value); - } catch (...) { - // Header unparseable -- not a great sign, fail request. - //printf("Content-length header unparseable\n"); - return 0; - } - } - } else { - // Non-empty header that isn't the status line, but no ':' present -- - // malformed request? - //printf("Malformed header: %s\n", header.c_str()); - return 0; - } - } - return header.size(); -} - -size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) { - State *obj = static_cast(userdata); - if (obj->GetStatusCode() < 0) { - return 0; - } // malformed request - got body before headers. - if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure. - return obj->Write(static_cast(buffer), size*nitems); -} - -int State::Write(char *buffer, size_t size) { - int retval = m_stream.Write(m_start_offset + m_offset, buffer, size); - if (retval == SFS_ERROR) { - return -1; - } - m_offset += retval; - return retval; -} - -size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) { - State *obj = static_cast(userdata); - if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers. - if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure. - return obj->Read(static_cast(buffer), size*nitems); -} - -int State::Read(char *buffer, size_t size) { - int retval = m_stream.Read(m_start_offset + m_offset, buffer, size); - if (retval == SFS_ERROR) { - return -1; - } - m_offset += retval; - //printf("Read a total of %ld bytes.\n", m_offset); - return retval; -} - -State State::Duplicate() { - CURL *curl = curl_easy_duphandle(m_curl); - if (!curl) { - throw std::runtime_error("Failed to duplicate existing curl handle."); - } - - State state(0, m_stream, curl, m_push); - - if (m_headers) { - state.m_headers_copy.reserve(m_headers_copy.size()); - for (auto &header : m_headers_copy) { - state.m_headers = curl_slist_append(state.m_headers, header.c_str()); - state.m_headers_copy.push_back(header); - } - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state.m_headers); - } - - return std::move(state); -} - -void State::SetTransferParameters(off_t offset, size_t size) { - m_start_offset = offset; - m_offset = 0; - m_content_length = size; - std::stringstream ss; - ss << offset << "-" << (offset+size-1); - curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str()); -} - -int State::AvailableBuffers() const -{ - return m_stream.AvailableBuffers(); -} diff --git a/src/XrdTpc/state.hh b/src/XrdTpc/state.hh deleted file mode 100644 index 5d52ec39cdd..00000000000 --- a/src/XrdTpc/state.hh +++ /dev/null @@ -1,88 +0,0 @@ -/** - * state.hh: - * - * Helper class for managing the state of a single TPC request. - */ - -#include -#include - -// Forward dec'ls -class XrdSfsFile; -class XrdHttpExtReq; -typedef void CURL; - -namespace TPC { -class Stream; - -class State { -public: - - // Note that we are "borrowing" a reference to the curl handle; - // it is not owned / freed by the State object. However, we use it - // as if there's only one handle per State. - State (off_t start_offset, Stream &stream, CURL *curl, bool push) : - m_push(push), - m_start_offset(start_offset), - m_stream(stream), - m_curl(curl) - { - InstallHandlers(curl); - } - - ~State(); - - void SetTransferParameters(off_t offset, size_t size); - - void CopyHeaders(XrdHttpExtReq &req); - - off_t BytesTransferred() const {return m_offset;} - - off_t GetContentLength() const {return m_content_length;} - - int GetStatusCode() const {return m_status_code;} - - void ResetAfterRequest(); - - CURL *GetHandle() const {return m_curl;} - - int AvailableBuffers() const; - - // Returns true if at least one byte of the response has been received, - // but not the entire contents of the response. - bool BodyTransferInProgress() const {return m_offset && (m_offset != m_content_length);} - - // Duplicate the current state; all settings are copied over, but those - // related to the transient state are reset as if from a constructor. - State Duplicate(); - - State(const State&) = delete; - State(State &&) noexcept; - -private: - bool InstallHandlers(CURL *curl); - - // libcurl callback functions, along with the corresponding class methods. - static size_t HeaderCB(char *buffer, size_t size, size_t nitems, - void *userdata); - int Header(const std::string &header); - static size_t WriteCB(void *buffer, size_t size, size_t nitems, void *userdata); - int Write(char *buffer, size_t size); - static size_t ReadCB(void *buffer, size_t size, size_t nitems, void *userdata); - int Read(char *buffer, size_t size); - - bool m_push{true}; // whether we are transferring in "push-mode" - bool m_recv_status_line{false}; // whether we have received a status line in the response from the remote host. - bool m_recv_all_headers{false}; // true if we have seen the end of headers. - off_t m_offset{0}; // number of bytes we have received. - off_t m_start_offset{0}; // offset where we started in the file. - int m_status_code{-1}; // status code from HTTP response. - off_t m_content_length{-1}; // value of Content-Length header, if we received one. - Stream &m_stream; // stream corresponding to this transfer. - CURL *m_curl{nullptr}; // libcurl handle - struct curl_slist *m_headers{nullptr}; // any headers we set as part of the libcurl request. - std::vector m_headers_copy; // Copies of custom headers. - std::string m_resp_protocol; // Response protocol in the HTTP status line. -}; - -}; diff --git a/src/XrdTpc/stream.cpp b/src/XrdTpc/stream.cpp deleted file mode 100644 index b2e55304330..00000000000 --- a/src/XrdTpc/stream.cpp +++ /dev/null @@ -1,90 +0,0 @@ - -#include "stream.hh" - -#include "XrdSfs/XrdSfsInterface.hh" - -using namespace TPC; - -Stream::~Stream() -{ - m_fh->close(); -} - - -int -Stream::Stat(struct stat* buf) -{ - return m_fh->stat(buf); -} - -int -Stream::Write(off_t offset, const char *buf, size_t size) -{ - bool buffer_accepted = false; - int retval = size; - if (offset < m_offset) { - return SFS_ERROR; - } - if (offset == m_offset) { - retval = m_fh->write(offset, buf, size); - buffer_accepted = true; - if (retval != SFS_ERROR) { - m_offset += retval; - } - // If there are no in-use buffers, then we don't need to - // do any accounting. - if (m_avail_count == m_buffers.size()) { - return retval; - } - } - // Even if we already accepted the current data, always - // iterate through available buffers and try to write as - // much out to disk as possible. - Entry *avail_entry; - bool buffer_was_written; - size_t avail_count = 0; - do { - avail_count = 0; - avail_entry = nullptr; - buffer_was_written = false; - for (Entry &entry : m_buffers) { - // Always try to dump from memory. - if (entry.Write(*this) > 0) { - buffer_was_written = true; - } - if (entry.Available()) { // Empty buffer - if (!avail_entry) {avail_entry = &entry;} - avail_count ++; - } - else if (!buffer_accepted && entry.Accept(offset, buf, size)) { - buffer_accepted = true; - } - } - } while ((avail_count != m_buffers.size()) && buffer_was_written); - m_avail_count = avail_count; - - if (!buffer_accepted) { // No place for this data in allocated buffers - if (!avail_entry) { // No available buffers to allocate. - return SFS_ERROR; - } - if (!avail_entry->Accept(offset, buf, size)) { // Empty buffer cannot accept?!? - return SFS_ERROR; - } - m_avail_count --; - } - - // If we have low buffer occupancy, then release memory. - if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) { - for (Entry &entry : m_buffers) { - entry.ShrinkIfUnused(); - } - } - - return retval; -} - -int -Stream::Read(off_t offset, char *buf, size_t size) -{ - return m_fh->read(offset, buf, size); -} diff --git a/src/XrdTpc/stream.hh b/src/XrdTpc/stream.hh deleted file mode 100644 index f123ec42301..00000000000 --- a/src/XrdTpc/stream.hh +++ /dev/null @@ -1,113 +0,0 @@ - -/** - * The "stream" interface is a simple abstraction of a file handle. - * - * The abstraction layer is necessary to do the necessary buffering - * of multi-stream writes where the underlying filesystem only - * supports single-stream writes. - */ - -#include -#include - -#include - -struct stat; - -class XrdSfsFile; - -namespace TPC { -class Stream { -public: - Stream(std::unique_ptr fh, size_t max_blocks, size_t buffer_size) - : m_avail_count(max_blocks), - m_fh(std::move(fh)) - { - //m_buffers.reserve(max_blocks); - for (size_t idx=0; idx < max_blocks; idx++) { - m_buffers.emplace_back(buffer_size); - } - } - - ~Stream(); - - int Stat(struct stat *); - - int Read(off_t offset, char *buffer, size_t size); - - int Write(off_t offset, const char *buffer, size_t size); - - size_t AvailableBuffers() const {return m_avail_count;} - -private: - - class Entry { - public: - Entry(size_t capacity) : - m_capacity(capacity) - {} - - Entry(const Entry&) = delete; - Entry(Entry&&) = default; - - bool Available() const {return m_offset == -1;} - - int Write(Stream &stream) { - if (Available() || !CanWrite(stream)) {return 0;} - // Currently, only full writes are accepted. - int size_desired = m_size; - int retval = stream.Write(m_offset, &m_buffer[0], size_desired); - m_size = 0; - m_offset = -1; - if (retval != size_desired) { - return -1; - } - return retval; - } - - bool Accept(off_t offset, const char *buf, size_t size) { - // Validate acceptance criteria. - if ((m_offset != -1) && (offset != m_offset + static_cast(m_size))) { - return false; - } - if (size > m_capacity - m_size) { - return false; - } - - // Inflate the underlying buffer if needed. - ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity(); - if (new_bytes_needed > 0) { - m_buffer.reserve(m_capacity); - } - - // Finally, do the copy. - memcpy(&m_buffer[0] + m_size, buf, size); - m_size += size; - if (m_offset == -1) { - m_offset = offset; - } - return true; - } - - void ShrinkIfUnused() { - if (!Available()) {return;} - m_buffer.shrink_to_fit(); - } - - private: - bool CanWrite(Stream &stream) const { - return (m_size > 0) && (m_offset == stream.m_offset); - } - - off_t m_offset{-1}; // Offset within file that m_buffer[0] represents. - const size_t m_capacity; - size_t m_size{0}; // Number of bytes held in buffer. - std::vector m_buffer; - }; - - size_t m_avail_count; - std::unique_ptr m_fh; - off_t m_offset{0}; - std::vector m_buffers; -}; -} diff --git a/src/XrdTpc/tpc.cpp b/src/XrdTpc/tpc.cpp deleted file mode 100644 index d83fa8b1967..00000000000 --- a/src/XrdTpc/tpc.cpp +++ /dev/null @@ -1,518 +0,0 @@ - -#include "XrdHttp/XrdHttpExtHandler.hh" -#include "XrdOuc/XrdOucEnv.hh" -#include "XrdSec/XrdSecEntity.hh" -#include "XrdSfs/XrdSfsInterface.hh" -#include "XrdVersion.hh" - -#include - -#include -#include - -#include -#include -#include -#include - -#include "state.hh" -#include "stream.hh" -#include "tpc.hh" - -using namespace TPC; - -std::atomic TPCHandler::m_monid{0}; - -XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC); - - -static char *quote(const char *str) { - int l = strlen(str); - char *r = (char *) malloc(l*3 + 1); - r[0] = '\0'; - int i, j = 0; - - for (i = 0; i < l; i++) { - char c = str[i]; - - switch (c) { - case ' ': - strcpy(r + j, "%20"); - j += 3; - break; - case '[': - strcpy(r + j, "%5B"); - j += 3; - break; - case ']': - strcpy(r + j, "%5D"); - j += 3; - break; - case ':': - strcpy(r + j, "%3A"); - j += 3; - break; - case '/': - strcpy(r + j, "%2F"); - j += 3; - break; - default: - r[j++] = c; - } - } - - r[j] = '\0'; - - return r; -} - - -bool TPCHandler::MatchesPath(const char *verb, const char *path) { - return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS"); -} - -static std::string PrepareURL(const std::string &input) { - if (!strncmp(input.c_str(), "davs://", 7)) { - return "https://" + input.substr(7); - } - return input; -} - -int TPCHandler::ProcessReq(XrdHttpExtReq &req) { - if (req.verb == "OPTIONS") { - return ProcessOptionsReq(req); - } - auto header = req.headers.find("Source"); - if (header != req.headers.end()) { - std::string src = PrepareURL(header->second); - m_log.Emsg("ProcessReq", "Pull request from", src.c_str()); - return ProcessPullReq(src, req); - } - header = req.headers.find("Destination"); - if (header != req.headers.end()) { - return ProcessPushReq(header->second, req); - } - m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified."); - return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0); -} - -TPCHandler::~TPCHandler() { - m_sfs = nullptr; // NOTE: must delete the SFS here as we may unload the destructor from memory below! - if (m_handle_base) { - dlclose(m_handle_base); - m_handle_base = nullptr; - } - if (m_handle_chained) { - dlclose(m_handle_chained); - m_handle_chained = nullptr; - } -} - -TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) : - m_log(*log) -{ - if (!Configure(config, myEnv)) { - throw std::runtime_error("Failed to configure the HTTP third-party-copy handler."); - } -} - -/** - * Handle the OPTIONS verb as we have added a new one... - */ -int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) { - return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: \r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0); -} - -std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) { - std::string authz; - auto authz_header = req.headers.find("Authorization"); - if (authz_header != req.headers.end()) { - char * quoted_url = quote(authz_header->second.c_str()); - std::stringstream ss; - ss << "authz=" << quoted_url; - free(quoted_url); - authz = ss.str(); - } - return authz; -} - -int TPCHandler::RedirectTransfer(XrdHttpExtReq &req, XrdOucErrInfo &error) { - int port; - const char *host = error.getErrText(port); - if ((host == nullptr) || (*host == '\0') || (port == 0)) { - char msg[] = "Internal error: redirect without hostname"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - std::stringstream ss; - ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << req.resource; - return req.SendSimpleResp(307, nullptr, const_cast(ss.str().c_str()), nullptr, 0); -} - -int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, - int mode, int openMode, const XrdSecEntity &sec, - const std::string &authz) -{ - int open_result; - while (1) { - open_result = fh.open(resource.c_str(), mode, openMode, &sec, - authz.empty() ? nullptr : authz.c_str()); - if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) { - int secs_to_stall = fh.error.getErrInfo(); - if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;} - sleep(secs_to_stall); - } - break; - } - return open_result; -} - -#ifdef XRD_CHUNK_RESP -/** - * Determine size at remote end. - */ -int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, - bool &success) { - success = false; - curl_easy_setopt(curl, CURLOPT_NOBODY, 1); - CURLcode res; - res = curl_easy_perform(curl); - if (res == CURLE_HTTP_RETURNED_ERROR) { - m_log.Emsg("DetermineXferSize", "Remote server failed request", curl_easy_strerror(res)); - curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(curl_easy_strerror(res)), 0); - } else if (state.GetStatusCode() >= 400) { - std::stringstream ss; - ss << "Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg("DetermineXferSize", "Remote server failed request", ss.str().c_str()); - curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, const_cast(ss.str().c_str()), 0); - } else if (res) { - m_log.Emsg("DetermineXferSize", "Curl failed", curl_easy_strerror(res)); - char msg[] = "Unknown internal transfer failure"; - curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - curl_easy_setopt(curl, CURLOPT_NOBODY, 0); - success = true; - return 0; -} - -int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred) { - std::stringstream ss; - const std::string crlf = "\n"; - ss << "Perf Marker" << crlf; - ss << "Timestamp: " << time(NULL) << crlf; - ss << "Stripe Index: 0" << crlf; - ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf; - ss << "Total Stripe Count: 1" << crlf; - ss << "End" << crlf; - - return req.ChunkResp(ss.str().c_str(), 0); -} - -int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, - const char *log_prefix) -{ - // Create the multi-handle and add in the current transfer to it. - CURLM *multi_handle = curl_multi_init(); - if (!multi_handle) { - m_log.Emsg(log_prefix, "Failed to initialize a libcurl multi-handle"); - char msg[] = "Failed to initialize internal server memory"; - curl_easy_cleanup(curl); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - - CURLMcode mres; - mres = curl_multi_add_handle(multi_handle, curl); - if (mres) { - m_log.Emsg(log_prefix, "Failed to add transfer to libcurl multi-handle", - curl_multi_strerror(mres)); - char msg[] = "Failed to initialize internal server handle"; - curl_easy_cleanup(curl); - curl_multi_cleanup(multi_handle); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - - // Start response to client prior to the first call to curl_multi_perform - int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); - if (retval) { - curl_easy_cleanup(curl); - curl_multi_cleanup(multi_handle); - return retval; - } - - // Transfer loop: use curl to actually run the transfer, but periodically - // interrupt things to send back performance updates to the client. - int running_handles = 1; - time_t last_marker = 0; - CURLcode res = static_cast(-1); - do { - time_t now = time(NULL); - time_t next_marker = last_marker + m_marker_period; - if (now >= next_marker) { - if (SendPerfMarker(req, state.BytesTransferred())) { - curl_multi_remove_handle(multi_handle, curl); - curl_easy_cleanup(curl); - curl_multi_cleanup(multi_handle); - return -1; - } - last_marker = now; - } - mres = curl_multi_perform(multi_handle, &running_handles); - if (mres == CURLM_CALL_MULTI_PERFORM) { - // curl_multi_perform should be called again immediately. On newer - // versions of curl, this is no longer used. - continue; - } else if (mres != CURLM_OK) { - break; - } else if (running_handles == 0) { - break; - } - //printf("There are %d running handles\n", running_handles); - - // Harvest any messages, looking for CURLMSG_DONE. - CURLMsg *msg; - do { - int msgq = 0; - msg = curl_multi_info_read(multi_handle, &msgq); - if (msg && (msg->msg == CURLMSG_DONE)) { - CURL *easy_handle = msg->easy_handle; - res = msg->data.result; - curl_multi_remove_handle(multi_handle, easy_handle); - curl_easy_cleanup(easy_handle); - } - } while (msg); - - int64_t max_sleep_time = next_marker - time(NULL); - if (max_sleep_time <= 0) { - continue; - } - int fd_count; - mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); - if (mres != CURLM_OK) { - break; - } - } while (running_handles); - - if (mres != CURLM_OK) { - m_log.Emsg(log_prefix, "Internal libcurl multi-handle error", - curl_multi_strerror(mres)); - char msg[] = "Internal server error due to libcurl"; - curl_multi_remove_handle(multi_handle, curl); - curl_easy_cleanup(curl); - - curl_multi_cleanup(multi_handle); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - - // Harvest any messages, looking for CURLMSG_DONE. - CURLMsg *msg; - do { - int msgq = 0; - msg = curl_multi_info_read(multi_handle, &msgq); - if (msg && (msg->msg == CURLMSG_DONE)) { - CURL *easy_handle = msg->easy_handle; - res = msg->data.result; - curl_multi_remove_handle(multi_handle, easy_handle); - curl_easy_cleanup(easy_handle); - } - } while (msg); - - if (res == -1) { // No transfers returned?!? - curl_multi_remove_handle(multi_handle, curl); - curl_easy_cleanup(curl); - curl_multi_cleanup(multi_handle); - char msg[] = "Internal state error in libcurl"; - m_log.Emsg(log_prefix, msg); - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - curl_multi_cleanup(multi_handle); - - // Generate the final response back to the client. - std::stringstream ss; - if (res != CURLE_OK) { - m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); - ss << "failure: " << curl_easy_strerror(res); - } else if (state.GetStatusCode() >= 400) { - ss << "failure: Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); - } else { - ss << "success: Created"; - } - - if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { - return retval; - } - return req.ChunkResp(nullptr, 0); -} -#else -int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state, - const char *log_prefix) { - CURLcode res; - res = curl_easy_perform(curl); - curl_easy_cleanup(curl); - if (res == CURLE_HTTP_RETURNED_ERROR) { - m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); - return req.SendSimpleResp(500, nullptr, nullptr, - const_cast(curl_easy_strerror(res)), 0); - } else if (state.GetStatusCode() >= 400) { - std::stringstream ss; - ss << "Remote side failed with status code " << state.GetStatusCode(); - m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); - return req.SendSimpleResp(500, nullptr, nullptr, - const_cast(ss.str().c_str()), 0); - } else if (res) { - m_log.Emsg(log_prefix, "Curl failed", curl_easy_strerror(res)); - char msg[] = "Unknown internal transfer failure"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } else { - char msg[] = "Created"; - return req.SendSimpleResp(201, nullptr, nullptr, msg, 0); - } -} -#endif - -int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) { - m_log.Emsg("ProcessPushReq", "Starting a push request for resource", resource.c_str()); - CURL *curl = curl_easy_init(); - if (!curl) { - char msg[] = "Failed to initialize internal transfer resources"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - char *name = req.GetSecEntity().name; - std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); - if (!fh.get()) { - char msg[] = "Failed to initialize internal transfer file handle"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - std::string authz = GetAuthz(req); - - int open_results = OpenWaitStall(*fh, req.resource, SFS_O_RDONLY, 0644, - req.GetSecEntity(), authz); - if (SFS_REDIRECT == open_results) { - return RedirectTransfer(req, fh->error); - } else if (SFS_OK != open_results) { - int code; - char msg_generic[] = "Failed to open local resource"; - const char *msg = fh->error.getErrText(code); - if (msg == nullptr) msg = msg_generic; - int status_code = 400; - if (code == EACCES) status_code = 401; - int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, - const_cast(msg), 0); - fh->close(); - return resp_result; - } - if (!m_cadir.empty()) { - curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str()); - } - curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); - - Stream stream(std::move(fh), 0, 0); - State state(0, stream, curl, true); - state.CopyHeaders(req); - -#ifdef XRD_CHUNK_RESP - return RunCurlWithUpdates(curl, req, state, "ProcessPushReq"); -#else - return RunCurlBasic(curl, req, state, "ProcessPushReq"); -#endif -} - -int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) { - CURL *curl = curl_easy_init(); - if (!curl) { - char msg[] = "Failed to initialize internal transfer resources"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - char *name = req.GetSecEntity().name; - std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); - if (!fh.get()) { - char msg[] = "Failed to initialize internal transfer file handle"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - std::string authz = GetAuthz(req); - XrdSfsFileOpenMode mode = SFS_O_CREAT; - auto overwrite_header = req.headers.find("Overwrite"); - if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) { - mode = SFS_O_TRUNC; - } - int streams = 1; - { - auto streams_header = req.headers.find("X-Number-Of-Streams"); - if (streams_header != req.headers.end()) { - int stream_req = -1; - try { - stream_req = std::stol(streams_header->second); - } catch (...) { // Handled below - } - if (stream_req < 1 || stream_req > 100) { - char msg[] = "Invalid request for number of streams"; - return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); - } - streams = stream_req; - } - } - - int open_result = OpenWaitStall(*fh, req.resource, mode|SFS_O_WRONLY, 0644, - req.GetSecEntity(), authz); - if (SFS_REDIRECT == open_result) { - return RedirectTransfer(req, fh->error); - } else if (SFS_OK != open_result) { - int code; - char msg_generic[] = "Failed to open local resource"; - const char *msg = fh->error.getErrText(code); - if ((msg == nullptr) || (*msg == '\0')) msg = msg_generic; - int status_code = 400; - if (code == EACCES) status_code = 401; - if (code == EEXIST) status_code = 412; - int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, - const_cast(msg), 0); - fh->close(); - return resp_result; - } - if (!m_cadir.empty()) { - curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str()); - } - curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); - Stream stream(std::move(fh), streams, m_block_size); - State state(0, stream, curl, false); - state.CopyHeaders(req); - -#ifdef XRD_CHUNK_RESP - if (streams > 1) { - return RunCurlWithStreams(req, state, "ProcessPullReq", streams); - } else { - return RunCurlWithUpdates(curl, req, state, "ProcessPullReq"); - } -#else - return RunCurlBasic(curl, req, state, "ProcessPullReq"); -#endif -} - - -extern "C" { - -XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) { - if (curl_global_init(CURL_GLOBAL_DEFAULT)) { - log->Emsg("Initialize", "libcurl failed to initialize"); - return nullptr; - } - - TPCHandler *retval{nullptr}; - if (!config) { - log->Emsg("Initialize", "TPC handler requires a config filename in order to load"); - return nullptr; - } - try { - log->Emsg("Initialize", "Will load configuration for the TPC handler from", config); - retval = new TPCHandler(log, config, myEnv); - } catch (std::runtime_error &re) { - log->Emsg("Initialize", "Encountered a runtime failure when loading ", re.what()); - //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*")); - } - return retval; -} - -} - diff --git a/src/XrdTpc/tpc.hh b/src/XrdTpc/tpc.hh deleted file mode 100644 index cfdd9896301..00000000000 --- a/src/XrdTpc/tpc.hh +++ /dev/null @@ -1,73 +0,0 @@ - -#include -#include -#include - -#include "XrdHttp/XrdHttpExtHandler.hh" - -class XrdOucErrInfo; -class XrdOucStream; -class XrdSfsFile; -class XrdSfsFileSystem; -typedef void CURL; - -namespace TPC { -class State; - -class TPCHandler : public XrdHttpExtHandler { -public: - TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv); - virtual ~TPCHandler(); - - virtual bool MatchesPath(const char *verb, const char *path) override; - virtual int ProcessReq(XrdHttpExtReq &req) override; - // Abstract method in the base class, but does not seem to be used - virtual int Init(const char *cfgfile) override {return 0;} - -private: - int ProcessOptionsReq(XrdHttpExtReq &req); - - static std::string GetAuthz(XrdHttpExtReq &req); - - int RedirectTransfer(XrdHttpExtReq &req, XrdOucErrInfo &error); - - int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode, - int openMode, const XrdSecEntity &sec, - const std::string &authz); - -#ifdef XRD_CHUNK_RESP - int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state, - bool &success); - - int SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred); - - // Perform the libcurl transfer, periodically sending back chunked updates. - int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state, - const char *log_prefix); - - // Experimental multi-stream version of RunCurlWithUpdates - int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state, - const char *log_prefix, size_t streams); -#else - int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state, - const char *log_prefix); -#endif - - int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req); - int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req); - - bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, - std::string &path2, bool &path2_alt); - bool Configure(const char *configfn, XrdOucEnv *myEnv); - - static constexpr int m_marker_period = 5; - static constexpr size_t m_block_size = 16*1024*1024; - bool m_desthttps{false}; - std::string m_cadir; - static std::atomic m_monid; - XrdSysError &m_log; - std::unique_ptr m_sfs; - void *m_handle_base{nullptr}; - void *m_handle_chained{nullptr}; -}; -} diff --git a/src/XrdTpc/xrootd-test-tpc b/src/XrdTpc/xrootd-test-tpc deleted file mode 100755 index 2508444a786..00000000000 --- a/src/XrdTpc/xrootd-test-tpc +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/python - -import os -import sys -import argparse - -import requests - -def parse_args(): - parser = argparse.ArgumentParser(description='Drive test TPC transfers') - parser.add_argument("-t", "--token", help="Bearer token for transfer") - parser.add_argument("--dest-token", help="Bearer token for destination host") - parser.add_argument("--src-token", help="Bearer token for source host") - parser.add_argument("--push", dest="mode", action="store_const", const="push", help="Use push-mode for transfer (source manages transfer)", default="auto") - parser.add_argument("--pull", dest="mode", action="store_const", const="pull", help="Use pull-mode for transfer (destination manages transfer)") - parser.add_argument("--no-overwrite", dest="overwrite", action="store_false", default=True, help="Disable overwrite of existing files.") - parser.add_argument("--streams", dest="streams", help="Allow multiple streams", default=1, - type=int) - parser.add_argument("src") - parser.add_argument("dest") - - args = parser.parse_args() - - if args.streams < 1: - print >> sys.stderr, "Invalid number of streams specified: %d" % args.streams - sys.exit(1) - - if not args.token and (not args.src_token or not args.dest_token): - if 'SCITOKEN' in os.environ and os.path.exists(os.environ['SCITOKEN']): - args.token = os.environ['SCITOKEN'] - elif os.path.exists(os.path.expanduser("~/.scitokens/token")): - args.token = os.path.expanduser("~/.scitokens/token") - elif os.path.exists('/tmp/scitoken_u%d' % os.geteuid()): - args.token = '/tmp/scitoken_u%d' % os.geteuid() - else: - print >> sys.stderr, "No token file found in user environment." - sys.exit(1) - - if not args.src_token: - args.src_token = args.token - if not args.dest_token: - args.dest_token = args.token - - return args - -def get_token(fname): - with open(fname, "r") as fp: - for line in fp: - if line.startswith("#"): - continue - return line.strip() - raise Exception("No token found in specified file (%s)" % fname) - -def determine_mode(args): - verbs = requests.options(args.dest) - if 'allow' in verbs.headers: - if 'COPY' in verbs.headers['allow'].split(","): - return 'pull' - return 'push' - -def main(): - args = parse_args() - - src_token = get_token(args.src_token) - dest_token = get_token(args.dest_token) - - headers = {} - if args.overwrite: - headers['Overwrite'] = 'T' - else: - headers['Overwrite'] = 'F' - if args.streams > 1: - headers['X-Number-Of-Streams'] = str(args.streams) - mode = args.mode - if mode == "auto": - mode = determine_mode(args) - print "Auto detect determined %s mode" % mode - - with requests.Session() as session: - session.verify = '/etc/grid-security/certificates' - if mode == 'pull': - headers['Authorization'] = 'Bearer %s' % dest_token - headers['Source'] = args.src - headers['Copy-Header'] = 'Authorization: Bearer %s' % src_token - url = args.dest - else: # Push - headers['Authorization'] = 'Bearer %s' % src_token - headers['Destination'] = args.dest - headers['Copy-Header'] = 'Authorization: Bearer %s' % dest_token - url = args.src - try_again = True - while try_again: - resp = session.request('COPY', url, headers=headers, allow_redirects = True) - print resp.status_code - print resp.headers - print resp.text - break - -if __name__ == '__main__': - main()