From 1001a81ef360971796055cf347247d97d3f7239f Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Tue, 5 Jun 2018 09:17:11 -0400 Subject: [PATCH] Revert "Revert "Merge pull request #679 from bbockelm/xrootd-tpc-rename"" This reverts commit d46c51616e7a26474f977afea911bac3506e7530. Reverting-the-revert to start working on cleanup of this branch for C++03 support and eventual re-merge. --- cmake/XRootDFindLibs.cmake | 9 + src/CMakeLists.txt | 1 + src/XrdHttp.cmake | 5 +- src/XrdTpc.cmake | 51 ++++ src/XrdTpc/CMakeLists.txt | 59 ++++ src/XrdTpc/README.md | 121 ++++++++ src/XrdTpc/configure.cpp | 182 ++++++++++++ src/XrdTpc/export-lib-symbols | 7 + src/XrdTpc/multistream.cpp | 333 ++++++++++++++++++++++ src/XrdTpc/state.cpp | 231 +++++++++++++++ src/XrdTpc/state.hh | 88 ++++++ src/XrdTpc/stream.cpp | 90 ++++++ src/XrdTpc/stream.hh | 113 ++++++++ src/XrdTpc/tpc.cpp | 518 ++++++++++++++++++++++++++++++++++ src/XrdTpc/tpc.hh | 73 +++++ src/XrdTpc/xrootd-test-tpc | 100 +++++++ 16 files changed, 1980 insertions(+), 1 deletion(-) create mode 100644 src/XrdTpc.cmake create mode 100644 src/XrdTpc/CMakeLists.txt create mode 100644 src/XrdTpc/README.md create mode 100644 src/XrdTpc/configure.cpp create mode 100644 src/XrdTpc/export-lib-symbols create mode 100644 src/XrdTpc/multistream.cpp create mode 100644 src/XrdTpc/state.cpp create mode 100644 src/XrdTpc/state.hh create mode 100644 src/XrdTpc/stream.cpp create mode 100644 src/XrdTpc/stream.hh create mode 100644 src/XrdTpc/tpc.cpp create mode 100644 src/XrdTpc/tpc.hh create mode 100755 src/XrdTpc/xrootd-test-tpc diff --git a/cmake/XRootDFindLibs.cmake b/cmake/XRootDFindLibs.cmake index b1e23550446..d07063aad52 100644 --- a/cmake/XRootDFindLibs.cmake +++ b/cmake/XRootDFindLibs.cmake @@ -27,6 +27,9 @@ if( SYSTEMD_FOUND ) add_definitions( -DHAVE_SYSTEMD ) endif() +include (FindPkgConfig) +pkg_check_modules(CURL libcurl) + if( ENABLE_CRYPTO ) find_package( OpenSSL ) if( OPENSSL_FOUND ) @@ -70,8 +73,14 @@ endif() if( ENABLE_HTTP ) if( OPENSSL_FOUND AND BUILD_CRYPTO ) set( BUILD_HTTP TRUE ) + if( CURL_FOUND ) + set( BUILD_TPC TRUE ) + else() + set( BUILD_TPC FALSE ) + endif() else() set( BUILD_HTTP FALSE ) + set( BUILD_TPC FALSE ) endif() endif() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 60cb227d377..eafa85da0bc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,6 +38,7 @@ include( XrdFileCache ) if( BUILD_HTTP ) include( XrdHttp ) + include( XrdTpc ) endif() if( BUILD_CEPH ) diff --git a/src/XrdHttp.cmake b/src/XrdHttp.cmake index e9f2befb220..843f1c0bc8c 100644 --- a/src/XrdHttp.cmake +++ b/src/XrdHttp.cmake @@ -15,9 +15,11 @@ if( BUILD_HTTP ) #----------------------------------------------------------------------------- include_directories( ${OPENSSL_INCLUDE_DIR} ) + # Note this is marked as a shared library as XrdHttp plugins are expected to + # link against this for the XrdHttpExt class implementations. add_library( ${LIB_XRD_HTTP} - MODULE + SHARED XrdHttp/XrdHttpProtocol.cc XrdHttp/XrdHttpProtocol.hh XrdHttp/XrdHttpReq.cc XrdHttp/XrdHttpReq.hh XrdHttp/XrdHttpSecXtractor.hh @@ -40,6 +42,7 @@ if( BUILD_HTTP ) ${LIB_XRD_HTTP} PROPERTIES INTERFACE_LINK_LIBRARIES "" + SUFFIX ".so" LINK_INTERFACE_LIBRARIES "" ) #----------------------------------------------------------------------------- diff --git a/src/XrdTpc.cmake b/src/XrdTpc.cmake new file mode 100644 index 00000000000..0b83958a0a2 --- /dev/null +++ b/src/XrdTpc.cmake @@ -0,0 +1,51 @@ +include( XRootDCommon ) + +#------------------------------------------------------------------------------- +# Modules +#------------------------------------------------------------------------------- +set( LIB_XRD_TPC XrdHttpTPC-${PLUGIN_VERSION} ) + +#------------------------------------------------------------------------------- +# Shared library version +#------------------------------------------------------------------------------- + +if( BUILD_TPC ) + #----------------------------------------------------------------------------- + # The XrdHttp library + #----------------------------------------------------------------------------- + include_directories( ${CURL_INCLUDE_DIRS} ) + + add_library( + ${LIB_XRD_TPC} + MODULE + XrdTpc/configure.cpp + XrdTpc/multistream.cpp + XrdTpc/state.cpp XrdTpc/state.hh + XrdTpc/stream.cpp XrdTpc/stream.hh + XrdTpc/tpc.cpp XrdTpc/tpc.hh) + + target_link_libraries( + ${LIB_XRD_TPC} + XrdServer + XrdUtils + XrdHttp-${PLUGIN_VERSION} + dl + pthread + ${CURL_LIBRARIES} ) + + set_target_properties( + ${LIB_XRD_TPC} + PROPERTIES + INTERFACE_LINK_LIBRARIES "" + LINK_INTERFACE_LIBRARIES "" + LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/src/XrdTpc/export-lib-symbols" + COMPILE_DEFINITIONS "XRD_CHUNK_RESP") + + #----------------------------------------------------------------------------- + # Install + #----------------------------------------------------------------------------- + install( + TARGETS ${LIB_XRD_TPC} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ) + +endif() diff --git a/src/XrdTpc/CMakeLists.txt b/src/XrdTpc/CMakeLists.txt new file mode 100644 index 00000000000..5c60269dba7 --- /dev/null +++ b/src/XrdTpc/CMakeLists.txt @@ -0,0 +1,59 @@ + +cmake_minimum_required( VERSION 2.8 ) +project( xrootd-tpc ) + +set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake ) + +find_package( Xrootd REQUIRED ) + +macro(use_cxx11) + if (CMAKE_VERSION VERSION_LESS "3.1") + if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set (CMAKE_CXX_FLAGS "-std=gnu++11 ${CMAKE_CXX_FLAGS}") + endif () + else () + set (CMAKE_CXX_STANDARD 11) + endif () +endmacro(use_cxx11) +use_cxx11() + +# Chunked responses were introduced after Xrootd 4.8.0. Check to see if the symbol exists. +#include (CheckCXXSymbolExists) +SET( CMAKE_REQUIRED_INCLUDES "${XROOTD_PRIVATE_INCLUDES}" ) +SET( CMAKE_REQUIRED_LIBRARIES "${XROOTD_HTTP_LIB}" ) +SET( CMAKE_REQUIRED_FLAGS "" ) +include (CheckCXXSourceCompiles) +CHECK_CXX_SOURCE_COMPILES("#include + +int main(int argc, char** argv) +{ + (void)argv; + return ((int*)reinterpret_cast(&XrdHttpExtReq::ChunkResp))[argc]; +} +" XRD_CHUNK_RESP) + +if( CMAKE_COMPILER_IS_GNUCXX ) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Werror" ) +endif() +SET( CMAKE_SHARED_LINKER_FLAGS "-Wl,--no-undefined") +SET( CMAKE_MODULE_LINKER_FLAGS "-Wl,--no-undefined") + +include (FindPkgConfig) +pkg_check_modules(CURL REQUIRED libcurl) + +include_directories(${XROOTD_INCLUDES} ${XROOTD_PRIVATE_INCLUDES} ${CURL_INCLUDE_DIRS}) + +add_library(XrdHttpTPC SHARED src/tpc.cpp src/state.cpp src/configure.cpp src/stream.cpp src/multistream.cpp) +if ( XRD_CHUNK_RESP ) + set_target_properties(XrdHttpTPC PROPERTIES COMPILE_DEFINITIONS "XRD_CHUNK_RESP" ) +endif () + +target_link_libraries(XrdHttpTPC -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${XROOTD_HTTP_LIB} ${CURL_LIBRARIES}) +set_target_properties(XrdHttpTPC PROPERTIES OUTPUT_NAME "XrdHttpTPC-4" SUFFIX ".so" LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/configs/export-lib-symbols") + +SET(LIB_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/lib" CACHE PATH "Install path for libraries") + +install( + TARGETS XrdHttpTPC + LIBRARY DESTINATION ${LIB_INSTALL_DIR} +) diff --git a/src/XrdTpc/README.md b/src/XrdTpc/README.md new file mode 100644 index 00000000000..a0845d23646 --- /dev/null +++ b/src/XrdTpc/README.md @@ -0,0 +1,121 @@ +# HTTPS Third Party Copy for XRootD + +The `xrootd-tpc` module provides an implementation of HTTPS third-party-copy +for the HTTPS implementation inside XRootD. + +To enable, set the following in the configuration file: + +``` +http.exthandler xrdtpc libXrdHttpTPC.so +``` + + +## HTTPS TPC technical details. + +Third-party-copy is done using the standard WebDAV copy verb. The actors involved are: + +1. The client orchestrating the transfer. +2. The source server where the resource is read from. +3. The destination server where the resource is written to. + +The client initiates the third party copy by issuing a COPY request to _either_ the source or destination +endpoint. Typically, when this is done, the initial endpoint redirects the client to a second service +(the disk server) that will performs the actual transfer. + +When the client contacts the source server, it should set the `Destination` header so the source knows +where to put the data. Analogously, if the client first contacts the destination server, it should set +the `Source` header. + +For the former case, the interaction looks like this: + +``` +-> COPY /sfn/of/source/replica HTTP/1.1 + Destination: https:///pfn/of/dest/replica +<- HTTP/1.1 302 Found + Location: https:///pfn/of/source/replica +``` + +where `` is the service that will actually perform the transfer. + +The client would follow the redirection and retry the `COPY`: + +``` +-> COPY /sfn/of/source/replica HTTP/1.1 + Destination: https:///pfn/of/dest/replica +<- HTTP/1.1 201 Created + Transfer-Encoding: chunked +``` + +The source server *should* respond with an appropriate status code (such as 201) in a timely manner. +As a TPC can take a significant amount of time, the source server SHOULD NOT wait until the transfer is +finished before responding with a success. In the case when a transfer can be started, it is recommended +that the response be started as soon as possible. + +In the case of a transfer being started (or queued) by the source server, it SHOULD use chunked encoding +and send a multipart response. + +Next `` will connect to the destination endpoint and actually perform the `PUT`: + +``` +-> PUT /pfn/of/destination/replica HTTP/1.1 +<- HTTP/1.1 201 Created +``` + +As the PUT is ongoing, the source disk server should send back a periodic transfer chunk of the following +form: + +``` +Perf Marker + Timestamp: 1360578938 + Stripe Index: 0 + Stripe Bytes Transferred: 49397760 + Total Stripe Count: 1 +End +``` + +Timestamps should be seconds from Unix epoch. It is recommended that the time period between chunks be +less than 30 seconds. + +If the transfer ultimately succeeds, then the last chunk should be of the following form: + +``` +success: Created +``` + +Here, `success` indicates the transfer status: subsequent text is informational for the user. + +Failure of the transfer can be indicated with the prefix `failed` or `failure`: + +``` +failure: Network connection unexpectedly closed. +``` + +Finally, if the source disk server decides to cancel or abort the transfer, use the `aborted` prefix: + +``` +aborted: Transfer took too long. +``` + +There is an analogous case when the client contacts the destination server to perform the `COPY` and +sets the `Source` header. In such a case, the response to the client looks identical but the destination +server will perform a `GET` instead of a `PUT`. + +In some cases, the user may want the server performing the transfer to append additional headers +(such as an HTTP `Authorization` header) to the transfer it initiates. In such case, the client should +utilize the `TransferHeader` mechanism. Add a header of the form: + +``` +TransferHeader
: +``` + +For example, if the client sends the following to the source server as part of its `COPY` request: + +``` +TransferHeaderAuthorization: Basic cGF1bDpwYXNzd29yZA== +``` + +then the source server should set the following header as part of its `PUT` request to the destination: + +``` +Authorization: Basic cGF1bDpwYXNzd29yZA== +``` diff --git a/src/XrdTpc/configure.cpp b/src/XrdTpc/configure.cpp new file mode 100644 index 00000000000..cf9915cdac6 --- /dev/null +++ b/src/XrdTpc/configure.cpp @@ -0,0 +1,182 @@ + +#include "tpc.hh" + +#include +#include + +#include "XrdOuc/XrdOucStream.hh" +#include "XrdOuc/XrdOucPinPath.hh" +#include "XrdSfs/XrdSfsInterface.hh" + +extern XrdSfsFileSystem *XrdSfsGetDefaultFileSystem(XrdSfsFileSystem *native_fs, + XrdSysLogger *lp, + const char *configfn, + XrdOucEnv *EnvInfo); + + +using namespace TPC; + + +static XrdSfsFileSystem *load_sfs(void *handle, bool alt, XrdSysError &log, const std::string &libpath, const char *configfn, XrdOucEnv &myEnv, XrdSfsFileSystem *prior_sfs) { + XrdSfsFileSystem *sfs = nullptr; + if (alt) { + auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *, XrdOucEnv *)) + (dlsym(handle, "XrdSfsGetFileSystem2")); + if (ep == nullptr) { + log.Emsg("Config", "Failed to load XrdSfsGetFileSystem2 from library ", libpath.c_str(), dlerror()); + return nullptr; + } + sfs = ep(prior_sfs, log.logger(), configfn, &myEnv); + } else { + auto ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *, XrdSysLogger *, const char *)) + (dlsym(nullptr, "XrdSfsGetFileSystem")); + if (ep == nullptr) { + log.Emsg("Config", "Failed to load XrdSfsGetFileSystem from library ", libpath.c_str(), dlerror()); + return nullptr; + } + sfs = ep(prior_sfs, log.logger(), configfn); + } + if (!sfs) { + log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", libpath.c_str()); + return nullptr; + } + return sfs; +} + + +bool TPCHandler::ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, std::string &path2, bool &path2_alt) { + char *val; + if (!(val = Config.GetWord())) { + m_log.Emsg("Config", "fslib not specified"); + return false; + } + if (!strcmp("throttle", val)) { + path2 = "libXrdThrottle.so"; + if (!(val = Config.GetWord())) { + m_log.Emsg("Config", "fslib throttle target library not specified"); + return false; + } + } + else if (!strcmp("-2", val)) { + path2_alt = true; + if (!(val = Config.GetWord())) { + m_log.Emsg("Config", "fslib library not specified"); + return false; + } + path2 = val; + } + else { + path2 = val; + } + if (!(val = Config.GetWord()) || !strcmp("default", val)) { + if (path2 == "libXrdThrottle.so") { + path1 = "default"; + } else if (!path2.empty()) { + path1 = path2; + path2 = ""; + path1_alt = path2_alt; + } + } else if (!strcmp("-2", val)) { + path1_alt = true; + if (!(val = Config.GetWord())) { + m_log.Emsg("Config", "fslib base library not specified"); + return false; + } + path1 = val; + } else { + path2 = val; + } + return true; +} + +bool TPCHandler::Configure(const char *configfn, XrdOucEnv *myEnv) +{ + XrdOucStream Config(&m_log, getenv("XRDINSTANCE"), myEnv, "=====> "); + + std::string authLib; + std::string authLibParms; + int cfgFD = open(configfn, O_RDONLY, 0); + if (cfgFD < 0) { + m_log.Emsg("Config", errno, "open config file", configfn); + return false; + } + Config.Attach(cfgFD); + const char *val; + std::string path2, path1 = "default"; + bool path1_alt = false, path2_alt = false; + while ((val = Config.GetMyFirstWord())) { + if (!strcmp("xrootd.fslib", val)) { + if (!ConfigureFSLib(Config, path1, path1_alt, path2, path2_alt)) { + Config.Close(); + m_log.Emsg("Config", "Failed to parse the xrootd.fslib directive"); + return false; + } + m_log.Emsg("Config", "xrootd.fslib line successfully processed by TPC handler/"); + } else if (!strcmp("http.desthttps", val)) { + if (!(val = Config.GetWord())) { + Config.Close(); + m_log.Emsg("Config", "http.desthttps value not specified"); + return false; + } + if (!strcmp("1", val) || !strcasecmp("yes", val) || !strcasecmp("true", val)) { + m_desthttps = true; + } else if (!strcmp("0", val) || !strcasecmp("no", val) || !strcasecmp("false", val)) { + m_desthttps = false; + } else { + Config.Close(); + m_log.Emsg("Config", "https.desthttps value is invalid", val); + return false; + } + } else if (!strcmp("http.cadir", val)) { + if (!(val = Config.GetWord())) { + Config.Close(); + m_log.Emsg("Config", "http.cadir value not specified"); + return false; + } + m_cadir = val; + } + } + Config.Close(); + + XrdSfsFileSystem *base_sfs = nullptr; + if (path1 == "default") { + m_log.Emsg("Config", "Loading the default filesystem"); + base_sfs = XrdSfsGetDefaultFileSystem(nullptr, m_log.logger(), configfn, myEnv); + m_log.Emsg("Config", "Finished loading the default filesystem"); + } else { + char resolvePath[2048]; + bool usedAltPath{true}; + if (!XrdOucPinPath(path1.c_str(), usedAltPath, resolvePath, 2048)) { + m_log.Emsg("Config", "Failed to locate appropriately versioned base filesystem library for ", path1.c_str()); + return false; + } + m_handle_base = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW); + if (m_handle_base == nullptr) { + m_log.Emsg("Config", "Failed to base plugin ", resolvePath, dlerror()); + return false; + } + base_sfs = load_sfs(m_handle_base, path1_alt, m_log, path1, configfn, *myEnv, nullptr); + } + if (!base_sfs) { + m_log.Emsg("Config", "Failed to initialize filesystem library for TPC handler from ", path1.c_str()); + return false; + } + XrdSfsFileSystem *chained_sfs = nullptr; + if (!path2.empty()) { + char resolvePath[2048]; + bool usedAltPath{true}; + if (!XrdOucPinPath(path2.c_str(), usedAltPath, resolvePath, 2048)) { + m_log.Emsg("Config", "Failed to locate appropriately versioned chained filesystem library for ", path2.c_str()); + return false; + } + m_handle_chained = dlopen(resolvePath, RTLD_LOCAL|RTLD_NOW); + if (m_handle_chained == nullptr) { + m_log.Emsg("Config", "Failed to chained plugin ", resolvePath, dlerror()); + return false; + } + chained_sfs = load_sfs(m_handle_chained, path2_alt, m_log, path2, configfn, *myEnv, base_sfs); + } + m_sfs.reset(chained_sfs ? chained_sfs : base_sfs); + m_log.Emsg("Config", "Successfully configured the filesystem object for TPC handler"); + return true; +} diff --git a/src/XrdTpc/export-lib-symbols b/src/XrdTpc/export-lib-symbols new file mode 100644 index 00000000000..4e6f0d221d8 --- /dev/null +++ b/src/XrdTpc/export-lib-symbols @@ -0,0 +1,7 @@ +{ +global: + XrdHttpGetExtHandler*; + +local: + *; +}; diff --git a/src/XrdTpc/multistream.cpp b/src/XrdTpc/multistream.cpp new file mode 100644 index 00000000000..2779d143f19 --- /dev/null +++ b/src/XrdTpc/multistream.cpp @@ -0,0 +1,333 @@ +/** + * Implementation of multi-stream HTTP transfers for the TPCHandler + */ + +#ifdef XRD_CHUNK_RESP + +#include "tpc.hh" +#include "state.hh" + +#include "XrdSys/XrdSysError.hh" + +#include + +#include +#include + +using namespace TPC; + +class CurlHandlerSetupError : public std::runtime_error { +public: + CurlHandlerSetupError(const std::string &msg) : + std::runtime_error(msg) + {} + virtual ~CurlHandlerSetupError() {} +}; + +namespace { +class MultiCurlHandler { +public: + MultiCurlHandler(std::vector &states) : + m_handle(curl_multi_init()), + m_states(states) + { + if (m_handle == nullptr) { + throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle"); + } + m_avail_handles.reserve(states.size()); + m_active_handles.reserve(states.size()); + for (State &state : states) { + m_avail_handles.push_back(state.GetHandle()); + } + } + + ~MultiCurlHandler() + { + if (!m_handle) {return;} + for (CURL * easy_handle : m_active_handles) { + curl_multi_remove_handle(m_handle, easy_handle); + curl_easy_cleanup(easy_handle); + } + for (auto & easy_handle : m_avail_handles) { + curl_easy_cleanup(easy_handle); + } + curl_multi_cleanup(m_handle); + } + + MultiCurlHandler(const MultiCurlHandler &) = delete; + + CURLM *Get() const {return m_handle;} + + void FinishCurlXfer(CURL *curl) { + CURLMcode mres = curl_multi_remove_handle(m_handle, curl); + if (mres) { + std::stringstream ss; + ss << "Failed to remove transfer from set: " + << curl_multi_strerror(mres); + throw std::runtime_error(ss.str()); + } + for (auto &state : m_states) { + if (curl == state.GetHandle()) { + state.ResetAfterRequest(); + break; + } + } + for (auto iter = m_active_handles.begin(); + iter != m_active_handles.end(); + ++iter) + { + if (*iter == curl) { + m_active_handles.erase(iter); + break; + } + } + m_avail_handles.push_back(curl); + } + + off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size, + int &running_handles) { + bool started_new_xfer = false; + do { + size_t xfer_size = std::min(content_length - current_offset, static_cast(block_size)); + if (xfer_size == 0) {return current_offset;} + if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) { + break; + } else { + running_handles += 1; + } + current_offset += xfer_size; + } while (true); + return current_offset; + } + +private: + + bool StartTransfer(off_t offset, size_t size) { + if (!CanStartTransfer()) {return false;} + for (auto &handle : m_avail_handles) { + for (auto &state : m_states) { + if (state.GetHandle() == handle) { // This state object represents an idle handle. + state.SetTransferParameters(offset, size); + ActivateHandle(state); + return true; + } + } + } + return false; + } + + void ActivateHandle(State &state) { + CURL *curl = state.GetHandle(); + m_active_handles.push_back(curl); + CURLMcode mres; + mres = curl_multi_add_handle(m_handle, curl); + if (mres) { + std::stringstream ss; + ss << "Failed to add transfer to libcurl multi-handle" + << curl_multi_strerror(mres); + throw std::runtime_error(ss.str()); + } + for (auto iter = m_avail_handles.begin(); + iter != m_avail_handles.end(); + ++iter) + { + if (*iter == curl) { + m_avail_handles.erase(iter); + break; + } + } + } + + bool CanStartTransfer() const { + size_t idle_handles = m_avail_handles.size(); + size_t transfer_in_progress = 0; + for (auto &state : m_states) { + for (const auto &handle : m_active_handles) { + if (handle == state.GetHandle()) { + transfer_in_progress += state.BodyTransferInProgress(); + break; + } + } + } + if (!idle_handles) { + return false; + } + ssize_t available_buffers = m_states[0].AvailableBuffers(); + // To be conservative, set aside buffers for any transfers that have been activated + // but don't have their first responses back yet. + available_buffers -= (m_active_handles.size() - transfer_in_progress); + return available_buffers > 0; + } + + CURLM *m_handle; + std::vector m_avail_handles; + std::vector m_active_handles; + std::vector &m_states; +}; +} + + +int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state, + const char *log_prefix, size_t streams) +try +{ + int result; + bool success; + CURL *curl = state.GetHandle(); + if ((result = DetermineXferSize(curl, req, state, success)) || !success) { + return result; + } + off_t content_size = state.GetContentLength(); + off_t current_offset = 0; + + { + std::stringstream ss; + ss << "Successfully determined remote size for pull request: " << content_size; + m_log.Emsg("ProcessPullReq", ss.str().c_str()); + } + state.ResetAfterRequest(); + + std::vector handles; + handles.reserve(streams); + handles.emplace_back(std::move(state)); + for (size_t idx = 1; idx < streams; idx++) { + handles.emplace_back(handles[0].Duplicate()); // Makes a duplicate of the original state + } + + // Create the multi-handle and add in the current transfer to it. + MultiCurlHandler mch(handles); + CURLM *multi_handle = mch.Get(); + + // Start response to client prior to the first call to curl_multi_perform + int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); + if (retval) { + return retval; + } + + // Start assigning transfers + int running_handles = 0; + current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles); + + // Transfer loop: use curl to actually run the transfer, but periodically + // interrupt things to send back performance updates to the client. + time_t last_marker = 0; + CURLcode res = static_cast(-1); + CURLMcode mres; + do { + time_t now = time(NULL); + time_t next_marker = last_marker + m_marker_period; + if (now >= next_marker) { + if (SendPerfMarker(req, current_offset)) { + return -1; + } + last_marker = now; + } + + mres = curl_multi_perform(multi_handle, &running_handles); + if (mres == CURLM_CALL_MULTI_PERFORM) { + // curl_multi_perform should be called again immediately. On newer + // versions of curl, this is no longer used. + continue; + } else if (mres != CURLM_OK) { + break; + } + + // Harvest any messages, looking for CURLMSG_DONE. + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + mch.FinishCurlXfer(easy_handle); + res = msg->data.result; + // If any requests fail, cut off the entire transfer. + if (res != CURLE_OK) { + break; + } + } + } while (msg); + if (res != -1 && res != CURLE_OK) { + break; + } + + if (running_handles < static_cast(streams)) { + // Issue new transfers if there is still pending work to do. + // Otherwise, continue running until there are no handles left. + if (current_offset != content_size) { + current_offset = mch.StartTransfers(current_offset, content_size, + m_block_size, running_handles); + } else if (running_handles == 0) { + break; + } + } + + int64_t max_sleep_time = next_marker - time(NULL); + if (max_sleep_time <= 0) { + continue; + } + int fd_count; + mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); + if (mres != CURLM_OK) { + break; + } + } while (running_handles); + + if (mres != CURLM_OK) { + std::stringstream ss; + ss << "Internal libcurl multi-handle error: " + << curl_multi_strerror(mres); + throw std::runtime_error(ss.str()); + } + + // Harvest any messages, looking for CURLMSG_DONE. + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + mch.FinishCurlXfer(easy_handle); + res = msg->data.result; // Transfer result will be examined below. + } + } while (msg); + + if (res == -1) { // No transfers returned?!? + throw std::runtime_error("Internal state error in libcurl"); + } + + // Generate the final response back to the client. + std::stringstream ss; + if (res != CURLE_OK) { + m_log.Emsg(log_prefix, "request failed when processing", curl_easy_strerror(res)); + ss << "failure: " << curl_easy_strerror(res); + } else if (current_offset != content_size) { + ss << "failure: Internal logic error led to early abort"; + m_log.Emsg(log_prefix, "Internal logic error led to early abort"); + } else if (state.GetStatusCode() >= 400) { + ss << "failure: Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + } else { + ss << "success: Created"; + } + + if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + return retval; + } + return req.ChunkResp(nullptr, 0); +} +catch (CurlHandlerSetupError e) { + m_log.Emsg(log_prefix, e.what()); + return req.SendSimpleResp(500, nullptr, nullptr, e.what(), 0); +} catch (std::runtime_error e) { + m_log.Emsg(log_prefix, e.what()); + std::stringstream ss; + ss << "failure: " << e.what(); + int retval; + if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + return retval; + } + return req.ChunkResp(nullptr, 0); +} + +#endif // XRD_CHUNK_RESP diff --git a/src/XrdTpc/state.cpp b/src/XrdTpc/state.cpp new file mode 100644 index 00000000000..c2c234ebf55 --- /dev/null +++ b/src/XrdTpc/state.cpp @@ -0,0 +1,231 @@ + +#include +#include +#include + +#include "XrdVersion.hh" +#include "XrdHttp/XrdHttpExtHandler.hh" +#include "XrdSfs/XrdSfsInterface.hh" + +#include + +#include "state.hh" +#include "stream.hh" + +using namespace TPC; + +State::~State() { + if (m_headers) { + curl_slist_free_all(m_headers); + m_headers = nullptr; + if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);} + } +} + +State::State(State && other) noexcept : + m_push(other.m_push), + m_recv_status_line(other.m_recv_status_line), + m_recv_all_headers(other.m_recv_all_headers), + m_offset(other.m_offset), + m_start_offset(other.m_start_offset), + m_status_code(other.m_status_code), + m_content_length(other.m_content_length), + m_stream(other.m_stream), + m_curl(other.m_curl), + m_headers(other.m_headers), + m_headers_copy(std::move(other.m_headers_copy)), + m_resp_protocol(std::move(m_resp_protocol)) +{ + curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this); + if (m_push) { + curl_easy_setopt(m_curl, CURLOPT_READDATA, this); + } else { + curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this); + } + other.m_headers_copy.clear(); + other.m_curl = nullptr; + other.m_headers = nullptr; +} + +bool State::InstallHandlers(CURL *curl) { + curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB); + curl_easy_setopt(curl, CURLOPT_HEADERDATA, this); + if (m_push) { + curl_easy_setopt(curl, CURLOPT_UPLOAD, 1); + curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB); + curl_easy_setopt(curl, CURLOPT_READDATA, this); + struct stat buf; + if (SFS_OK == m_stream.Stat(&buf)) { + curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size); + } + } else { + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); + } + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); + + // Require a minimum speed from the transfer: must move at least 1MB every 2 minutes + // (roughly 8KB/s). + curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60); + curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 1024*1024); + return true; +} + +/** + * Handle the 'Copy-Headers' feature + */ +void State::CopyHeaders(XrdHttpExtReq &req) { + struct curl_slist *list = nullptr; + for (auto &hdr : req.headers) { + if (hdr.first == "Copy-Header") { + list = curl_slist_append(list, hdr.second.c_str()); + m_headers_copy.emplace_back(hdr.second); + } + // Note: len("TransferHeader") == 14 + if (!hdr.first.compare(0, 14, "TransferHeader")) { + std::stringstream ss; + ss << hdr.first.substr(14) << ": " << hdr.second; + list = curl_slist_append(list, ss.str().c_str()); + m_headers_copy.emplace_back(ss.str()); + } + } + if (list != nullptr) { + curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list); + m_headers = list; + } +} + +void State::ResetAfterRequest() { + m_offset = 0; + m_status_code = -1; + m_content_length = -1; + m_recv_all_headers = false; + m_recv_status_line = false; +} + +size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata) +{ + State *obj = static_cast(userdata); + std::string header(buffer, size*nitems); + return obj->Header(header); +} + +int State::Header(const std::string &header) { + //printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str()); + if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect? + m_recv_all_headers = false; + m_recv_status_line = false; + } + if (!m_recv_status_line) { + std::stringstream ss(header); + std::string item; + if (!std::getline(ss, item, ' ')) return 0; + m_resp_protocol = item; + //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str()); + if (!std::getline(ss, item, ' ')) return 0; + try { + m_status_code = std::stol(item); + } catch (...) { + return 0; + } + m_recv_status_line = true; + } else if (header.size() == 0 || header == "\n") { + m_recv_all_headers = true; + } + else if (header != "\r\n") { + // Parse the header + std::size_t found = header.find(":"); + if (found != std::string::npos) { + std::string header_name = header.substr(0, found); + std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower); + std::string header_value = header.substr(found+1); + if (header_name == "content-length") + { + try { + m_content_length = std::stoll(header_value); + } catch (...) { + // Header unparseable -- not a great sign, fail request. + //printf("Content-length header unparseable\n"); + return 0; + } + } + } else { + // Non-empty header that isn't the status line, but no ':' present -- + // malformed request? + //printf("Malformed header: %s\n", header.c_str()); + return 0; + } + } + return header.size(); +} + +size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) { + State *obj = static_cast(userdata); + if (obj->GetStatusCode() < 0) { + return 0; + } // malformed request - got body before headers. + if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure. + return obj->Write(static_cast(buffer), size*nitems); +} + +int State::Write(char *buffer, size_t size) { + int retval = m_stream.Write(m_start_offset + m_offset, buffer, size); + if (retval == SFS_ERROR) { + return -1; + } + m_offset += retval; + return retval; +} + +size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) { + State *obj = static_cast(userdata); + if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers. + if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure. + return obj->Read(static_cast(buffer), size*nitems); +} + +int State::Read(char *buffer, size_t size) { + int retval = m_stream.Read(m_start_offset + m_offset, buffer, size); + if (retval == SFS_ERROR) { + return -1; + } + m_offset += retval; + //printf("Read a total of %ld bytes.\n", m_offset); + return retval; +} + +State State::Duplicate() { + CURL *curl = curl_easy_duphandle(m_curl); + if (!curl) { + throw std::runtime_error("Failed to duplicate existing curl handle."); + } + + State state(0, m_stream, curl, m_push); + + if (m_headers) { + state.m_headers_copy.reserve(m_headers_copy.size()); + for (auto &header : m_headers_copy) { + state.m_headers = curl_slist_append(state.m_headers, header.c_str()); + state.m_headers_copy.push_back(header); + } + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state.m_headers); + } + + return std::move(state); +} + +void State::SetTransferParameters(off_t offset, size_t size) { + m_start_offset = offset; + m_offset = 0; + m_content_length = size; + std::stringstream ss; + ss << offset << "-" << (offset+size-1); + curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str()); +} + +int State::AvailableBuffers() const +{ + return m_stream.AvailableBuffers(); +} diff --git a/src/XrdTpc/state.hh b/src/XrdTpc/state.hh new file mode 100644 index 00000000000..5d52ec39cdd --- /dev/null +++ b/src/XrdTpc/state.hh @@ -0,0 +1,88 @@ +/** + * state.hh: + * + * Helper class for managing the state of a single TPC request. + */ + +#include +#include + +// Forward dec'ls +class XrdSfsFile; +class XrdHttpExtReq; +typedef void CURL; + +namespace TPC { +class Stream; + +class State { +public: + + // Note that we are "borrowing" a reference to the curl handle; + // it is not owned / freed by the State object. However, we use it + // as if there's only one handle per State. + State (off_t start_offset, Stream &stream, CURL *curl, bool push) : + m_push(push), + m_start_offset(start_offset), + m_stream(stream), + m_curl(curl) + { + InstallHandlers(curl); + } + + ~State(); + + void SetTransferParameters(off_t offset, size_t size); + + void CopyHeaders(XrdHttpExtReq &req); + + off_t BytesTransferred() const {return m_offset;} + + off_t GetContentLength() const {return m_content_length;} + + int GetStatusCode() const {return m_status_code;} + + void ResetAfterRequest(); + + CURL *GetHandle() const {return m_curl;} + + int AvailableBuffers() const; + + // Returns true if at least one byte of the response has been received, + // but not the entire contents of the response. + bool BodyTransferInProgress() const {return m_offset && (m_offset != m_content_length);} + + // Duplicate the current state; all settings are copied over, but those + // related to the transient state are reset as if from a constructor. + State Duplicate(); + + State(const State&) = delete; + State(State &&) noexcept; + +private: + bool InstallHandlers(CURL *curl); + + // libcurl callback functions, along with the corresponding class methods. + static size_t HeaderCB(char *buffer, size_t size, size_t nitems, + void *userdata); + int Header(const std::string &header); + static size_t WriteCB(void *buffer, size_t size, size_t nitems, void *userdata); + int Write(char *buffer, size_t size); + static size_t ReadCB(void *buffer, size_t size, size_t nitems, void *userdata); + int Read(char *buffer, size_t size); + + bool m_push{true}; // whether we are transferring in "push-mode" + bool m_recv_status_line{false}; // whether we have received a status line in the response from the remote host. + bool m_recv_all_headers{false}; // true if we have seen the end of headers. + off_t m_offset{0}; // number of bytes we have received. + off_t m_start_offset{0}; // offset where we started in the file. + int m_status_code{-1}; // status code from HTTP response. + off_t m_content_length{-1}; // value of Content-Length header, if we received one. + Stream &m_stream; // stream corresponding to this transfer. + CURL *m_curl{nullptr}; // libcurl handle + struct curl_slist *m_headers{nullptr}; // any headers we set as part of the libcurl request. + std::vector m_headers_copy; // Copies of custom headers. + std::string m_resp_protocol; // Response protocol in the HTTP status line. +}; + +}; diff --git a/src/XrdTpc/stream.cpp b/src/XrdTpc/stream.cpp new file mode 100644 index 00000000000..b2e55304330 --- /dev/null +++ b/src/XrdTpc/stream.cpp @@ -0,0 +1,90 @@ + +#include "stream.hh" + +#include "XrdSfs/XrdSfsInterface.hh" + +using namespace TPC; + +Stream::~Stream() +{ + m_fh->close(); +} + + +int +Stream::Stat(struct stat* buf) +{ + return m_fh->stat(buf); +} + +int +Stream::Write(off_t offset, const char *buf, size_t size) +{ + bool buffer_accepted = false; + int retval = size; + if (offset < m_offset) { + return SFS_ERROR; + } + if (offset == m_offset) { + retval = m_fh->write(offset, buf, size); + buffer_accepted = true; + if (retval != SFS_ERROR) { + m_offset += retval; + } + // If there are no in-use buffers, then we don't need to + // do any accounting. + if (m_avail_count == m_buffers.size()) { + return retval; + } + } + // Even if we already accepted the current data, always + // iterate through available buffers and try to write as + // much out to disk as possible. + Entry *avail_entry; + bool buffer_was_written; + size_t avail_count = 0; + do { + avail_count = 0; + avail_entry = nullptr; + buffer_was_written = false; + for (Entry &entry : m_buffers) { + // Always try to dump from memory. + if (entry.Write(*this) > 0) { + buffer_was_written = true; + } + if (entry.Available()) { // Empty buffer + if (!avail_entry) {avail_entry = &entry;} + avail_count ++; + } + else if (!buffer_accepted && entry.Accept(offset, buf, size)) { + buffer_accepted = true; + } + } + } while ((avail_count != m_buffers.size()) && buffer_was_written); + m_avail_count = avail_count; + + if (!buffer_accepted) { // No place for this data in allocated buffers + if (!avail_entry) { // No available buffers to allocate. + return SFS_ERROR; + } + if (!avail_entry->Accept(offset, buf, size)) { // Empty buffer cannot accept?!? + return SFS_ERROR; + } + m_avail_count --; + } + + // If we have low buffer occupancy, then release memory. + if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) { + for (Entry &entry : m_buffers) { + entry.ShrinkIfUnused(); + } + } + + return retval; +} + +int +Stream::Read(off_t offset, char *buf, size_t size) +{ + return m_fh->read(offset, buf, size); +} diff --git a/src/XrdTpc/stream.hh b/src/XrdTpc/stream.hh new file mode 100644 index 00000000000..f123ec42301 --- /dev/null +++ b/src/XrdTpc/stream.hh @@ -0,0 +1,113 @@ + +/** + * The "stream" interface is a simple abstraction of a file handle. + * + * The abstraction layer is necessary to do the necessary buffering + * of multi-stream writes where the underlying filesystem only + * supports single-stream writes. + */ + +#include +#include + +#include + +struct stat; + +class XrdSfsFile; + +namespace TPC { +class Stream { +public: + Stream(std::unique_ptr fh, size_t max_blocks, size_t buffer_size) + : m_avail_count(max_blocks), + m_fh(std::move(fh)) + { + //m_buffers.reserve(max_blocks); + for (size_t idx=0; idx < max_blocks; idx++) { + m_buffers.emplace_back(buffer_size); + } + } + + ~Stream(); + + int Stat(struct stat *); + + int Read(off_t offset, char *buffer, size_t size); + + int Write(off_t offset, const char *buffer, size_t size); + + size_t AvailableBuffers() const {return m_avail_count;} + +private: + + class Entry { + public: + Entry(size_t capacity) : + m_capacity(capacity) + {} + + Entry(const Entry&) = delete; + Entry(Entry&&) = default; + + bool Available() const {return m_offset == -1;} + + int Write(Stream &stream) { + if (Available() || !CanWrite(stream)) {return 0;} + // Currently, only full writes are accepted. + int size_desired = m_size; + int retval = stream.Write(m_offset, &m_buffer[0], size_desired); + m_size = 0; + m_offset = -1; + if (retval != size_desired) { + return -1; + } + return retval; + } + + bool Accept(off_t offset, const char *buf, size_t size) { + // Validate acceptance criteria. + if ((m_offset != -1) && (offset != m_offset + static_cast(m_size))) { + return false; + } + if (size > m_capacity - m_size) { + return false; + } + + // Inflate the underlying buffer if needed. + ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity(); + if (new_bytes_needed > 0) { + m_buffer.reserve(m_capacity); + } + + // Finally, do the copy. + memcpy(&m_buffer[0] + m_size, buf, size); + m_size += size; + if (m_offset == -1) { + m_offset = offset; + } + return true; + } + + void ShrinkIfUnused() { + if (!Available()) {return;} + m_buffer.shrink_to_fit(); + } + + private: + bool CanWrite(Stream &stream) const { + return (m_size > 0) && (m_offset == stream.m_offset); + } + + off_t m_offset{-1}; // Offset within file that m_buffer[0] represents. + const size_t m_capacity; + size_t m_size{0}; // Number of bytes held in buffer. + std::vector m_buffer; + }; + + size_t m_avail_count; + std::unique_ptr m_fh; + off_t m_offset{0}; + std::vector m_buffers; +}; +} diff --git a/src/XrdTpc/tpc.cpp b/src/XrdTpc/tpc.cpp new file mode 100644 index 00000000000..d83fa8b1967 --- /dev/null +++ b/src/XrdTpc/tpc.cpp @@ -0,0 +1,518 @@ + +#include "XrdHttp/XrdHttpExtHandler.hh" +#include "XrdOuc/XrdOucEnv.hh" +#include "XrdSec/XrdSecEntity.hh" +#include "XrdSfs/XrdSfsInterface.hh" +#include "XrdVersion.hh" + +#include + +#include +#include + +#include +#include +#include +#include + +#include "state.hh" +#include "stream.hh" +#include "tpc.hh" + +using namespace TPC; + +std::atomic TPCHandler::m_monid{0}; + +XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC); + + +static char *quote(const char *str) { + int l = strlen(str); + char *r = (char *) malloc(l*3 + 1); + r[0] = '\0'; + int i, j = 0; + + for (i = 0; i < l; i++) { + char c = str[i]; + + switch (c) { + case ' ': + strcpy(r + j, "%20"); + j += 3; + break; + case '[': + strcpy(r + j, "%5B"); + j += 3; + break; + case ']': + strcpy(r + j, "%5D"); + j += 3; + break; + case ':': + strcpy(r + j, "%3A"); + j += 3; + break; + case '/': + strcpy(r + j, "%2F"); + j += 3; + break; + default: + r[j++] = c; + } + } + + r[j] = '\0'; + + return r; +} + + +bool TPCHandler::MatchesPath(const char *verb, const char *path) { + return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS"); +} + +static std::string PrepareURL(const std::string &input) { + if (!strncmp(input.c_str(), "davs://", 7)) { + return "https://" + input.substr(7); + } + return input; +} + +int TPCHandler::ProcessReq(XrdHttpExtReq &req) { + if (req.verb == "OPTIONS") { + return ProcessOptionsReq(req); + } + auto header = req.headers.find("Source"); + if (header != req.headers.end()) { + std::string src = PrepareURL(header->second); + m_log.Emsg("ProcessReq", "Pull request from", src.c_str()); + return ProcessPullReq(src, req); + } + header = req.headers.find("Destination"); + if (header != req.headers.end()) { + return ProcessPushReq(header->second, req); + } + m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified."); + return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0); +} + +TPCHandler::~TPCHandler() { + m_sfs = nullptr; // NOTE: must delete the SFS here as we may unload the destructor from memory below! + if (m_handle_base) { + dlclose(m_handle_base); + m_handle_base = nullptr; + } + if (m_handle_chained) { + dlclose(m_handle_chained); + m_handle_chained = nullptr; + } +} + +TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) : + m_log(*log) +{ + if (!Configure(config, myEnv)) { + throw std::runtime_error("Failed to configure the HTTP third-party-copy handler."); + } +} + +/** + * Handle the OPTIONS verb as we have added a new one... + */ +int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) { + return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: \r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0); +} + +std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) { + std::string authz; + auto authz_header = req.headers.find("Authorization"); + if (authz_header != req.headers.end()) { + char * quoted_url = quote(authz_header->second.c_str()); + std::stringstream ss; + ss << "authz=" << quoted_url; + free(quoted_url); + authz = ss.str(); + } + return authz; +} + +int TPCHandler::RedirectTransfer(XrdHttpExtReq &req, XrdOucErrInfo &error) { + int port; + const char *host = error.getErrText(port); + if ((host == nullptr) || (*host == '\0') || (port == 0)) { + char msg[] = "Internal error: redirect without hostname"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + std::stringstream ss; + ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << req.resource; + return req.SendSimpleResp(307, nullptr, const_cast(ss.str().c_str()), nullptr, 0); +} + +int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource, + int mode, int openMode, const XrdSecEntity &sec, + const std::string &authz) +{ + int open_result; + while (1) { + open_result = fh.open(resource.c_str(), mode, openMode, &sec, + authz.empty() ? nullptr : authz.c_str()); + if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) { + int secs_to_stall = fh.error.getErrInfo(); + if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;} + sleep(secs_to_stall); + } + break; + } + return open_result; +} + +#ifdef XRD_CHUNK_RESP +/** + * Determine size at remote end. + */ +int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state, + bool &success) { + success = false; + curl_easy_setopt(curl, CURLOPT_NOBODY, 1); + CURLcode res; + res = curl_easy_perform(curl); + if (res == CURLE_HTTP_RETURNED_ERROR) { + m_log.Emsg("DetermineXferSize", "Remote server failed request", curl_easy_strerror(res)); + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, const_cast(curl_easy_strerror(res)), 0); + } else if (state.GetStatusCode() >= 400) { + std::stringstream ss; + ss << "Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg("DetermineXferSize", "Remote server failed request", ss.str().c_str()); + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, const_cast(ss.str().c_str()), 0); + } else if (res) { + m_log.Emsg("DetermineXferSize", "Curl failed", curl_easy_strerror(res)); + char msg[] = "Unknown internal transfer failure"; + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + curl_easy_setopt(curl, CURLOPT_NOBODY, 0); + success = true; + return 0; +} + +int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred) { + std::stringstream ss; + const std::string crlf = "\n"; + ss << "Perf Marker" << crlf; + ss << "Timestamp: " << time(NULL) << crlf; + ss << "Stripe Index: 0" << crlf; + ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf; + ss << "Total Stripe Count: 1" << crlf; + ss << "End" << crlf; + + return req.ChunkResp(ss.str().c_str(), 0); +} + +int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, + const char *log_prefix) +{ + // Create the multi-handle and add in the current transfer to it. + CURLM *multi_handle = curl_multi_init(); + if (!multi_handle) { + m_log.Emsg(log_prefix, "Failed to initialize a libcurl multi-handle"); + char msg[] = "Failed to initialize internal server memory"; + curl_easy_cleanup(curl); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + + CURLMcode mres; + mres = curl_multi_add_handle(multi_handle, curl); + if (mres) { + m_log.Emsg(log_prefix, "Failed to add transfer to libcurl multi-handle", + curl_multi_strerror(mres)); + char msg[] = "Failed to initialize internal server handle"; + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + + // Start response to client prior to the first call to curl_multi_perform + int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain"); + if (retval) { + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + return retval; + } + + // Transfer loop: use curl to actually run the transfer, but periodically + // interrupt things to send back performance updates to the client. + int running_handles = 1; + time_t last_marker = 0; + CURLcode res = static_cast(-1); + do { + time_t now = time(NULL); + time_t next_marker = last_marker + m_marker_period; + if (now >= next_marker) { + if (SendPerfMarker(req, state.BytesTransferred())) { + curl_multi_remove_handle(multi_handle, curl); + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + return -1; + } + last_marker = now; + } + mres = curl_multi_perform(multi_handle, &running_handles); + if (mres == CURLM_CALL_MULTI_PERFORM) { + // curl_multi_perform should be called again immediately. On newer + // versions of curl, this is no longer used. + continue; + } else if (mres != CURLM_OK) { + break; + } else if (running_handles == 0) { + break; + } + //printf("There are %d running handles\n", running_handles); + + // Harvest any messages, looking for CURLMSG_DONE. + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + res = msg->data.result; + curl_multi_remove_handle(multi_handle, easy_handle); + curl_easy_cleanup(easy_handle); + } + } while (msg); + + int64_t max_sleep_time = next_marker - time(NULL); + if (max_sleep_time <= 0) { + continue; + } + int fd_count; + mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count); + if (mres != CURLM_OK) { + break; + } + } while (running_handles); + + if (mres != CURLM_OK) { + m_log.Emsg(log_prefix, "Internal libcurl multi-handle error", + curl_multi_strerror(mres)); + char msg[] = "Internal server error due to libcurl"; + curl_multi_remove_handle(multi_handle, curl); + curl_easy_cleanup(curl); + + curl_multi_cleanup(multi_handle); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + + // Harvest any messages, looking for CURLMSG_DONE. + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + CURL *easy_handle = msg->easy_handle; + res = msg->data.result; + curl_multi_remove_handle(multi_handle, easy_handle); + curl_easy_cleanup(easy_handle); + } + } while (msg); + + if (res == -1) { // No transfers returned?!? + curl_multi_remove_handle(multi_handle, curl); + curl_easy_cleanup(curl); + curl_multi_cleanup(multi_handle); + char msg[] = "Internal state error in libcurl"; + m_log.Emsg(log_prefix, msg); + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + curl_multi_cleanup(multi_handle); + + // Generate the final response back to the client. + std::stringstream ss; + if (res != CURLE_OK) { + m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); + ss << "failure: " << curl_easy_strerror(res); + } else if (state.GetStatusCode() >= 400) { + ss << "failure: Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + } else { + ss << "success: Created"; + } + + if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { + return retval; + } + return req.ChunkResp(nullptr, 0); +} +#else +int TPCHandler::RunCurlBasic(CURL *curl, XrdHttpExtReq &req, State &state, + const char *log_prefix) { + CURLcode res; + res = curl_easy_perform(curl); + curl_easy_cleanup(curl); + if (res == CURLE_HTTP_RETURNED_ERROR) { + m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res)); + return req.SendSimpleResp(500, nullptr, nullptr, + const_cast(curl_easy_strerror(res)), 0); + } else if (state.GetStatusCode() >= 400) { + std::stringstream ss; + ss << "Remote side failed with status code " << state.GetStatusCode(); + m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); + return req.SendSimpleResp(500, nullptr, nullptr, + const_cast(ss.str().c_str()), 0); + } else if (res) { + m_log.Emsg(log_prefix, "Curl failed", curl_easy_strerror(res)); + char msg[] = "Unknown internal transfer failure"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } else { + char msg[] = "Created"; + return req.SendSimpleResp(201, nullptr, nullptr, msg, 0); + } +} +#endif + +int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) { + m_log.Emsg("ProcessPushReq", "Starting a push request for resource", resource.c_str()); + CURL *curl = curl_easy_init(); + if (!curl) { + char msg[] = "Failed to initialize internal transfer resources"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + char *name = req.GetSecEntity().name; + std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); + if (!fh.get()) { + char msg[] = "Failed to initialize internal transfer file handle"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + std::string authz = GetAuthz(req); + + int open_results = OpenWaitStall(*fh, req.resource, SFS_O_RDONLY, 0644, + req.GetSecEntity(), authz); + if (SFS_REDIRECT == open_results) { + return RedirectTransfer(req, fh->error); + } else if (SFS_OK != open_results) { + int code; + char msg_generic[] = "Failed to open local resource"; + const char *msg = fh->error.getErrText(code); + if (msg == nullptr) msg = msg_generic; + int status_code = 400; + if (code == EACCES) status_code = 401; + int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, + const_cast(msg), 0); + fh->close(); + return resp_result; + } + if (!m_cadir.empty()) { + curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str()); + } + curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); + + Stream stream(std::move(fh), 0, 0); + State state(0, stream, curl, true); + state.CopyHeaders(req); + +#ifdef XRD_CHUNK_RESP + return RunCurlWithUpdates(curl, req, state, "ProcessPushReq"); +#else + return RunCurlBasic(curl, req, state, "ProcessPushReq"); +#endif +} + +int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) { + CURL *curl = curl_easy_init(); + if (!curl) { + char msg[] = "Failed to initialize internal transfer resources"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + char *name = req.GetSecEntity().name; + std::unique_ptr fh(m_sfs->newFile(name, m_monid++)); + if (!fh.get()) { + char msg[] = "Failed to initialize internal transfer file handle"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + std::string authz = GetAuthz(req); + XrdSfsFileOpenMode mode = SFS_O_CREAT; + auto overwrite_header = req.headers.find("Overwrite"); + if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) { + mode = SFS_O_TRUNC; + } + int streams = 1; + { + auto streams_header = req.headers.find("X-Number-Of-Streams"); + if (streams_header != req.headers.end()) { + int stream_req = -1; + try { + stream_req = std::stol(streams_header->second); + } catch (...) { // Handled below + } + if (stream_req < 1 || stream_req > 100) { + char msg[] = "Invalid request for number of streams"; + return req.SendSimpleResp(500, nullptr, nullptr, msg, 0); + } + streams = stream_req; + } + } + + int open_result = OpenWaitStall(*fh, req.resource, mode|SFS_O_WRONLY, 0644, + req.GetSecEntity(), authz); + if (SFS_REDIRECT == open_result) { + return RedirectTransfer(req, fh->error); + } else if (SFS_OK != open_result) { + int code; + char msg_generic[] = "Failed to open local resource"; + const char *msg = fh->error.getErrText(code); + if ((msg == nullptr) || (*msg == '\0')) msg = msg_generic; + int status_code = 400; + if (code == EACCES) status_code = 401; + if (code == EEXIST) status_code = 412; + int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, + const_cast(msg), 0); + fh->close(); + return resp_result; + } + if (!m_cadir.empty()) { + curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str()); + } + curl_easy_setopt(curl, CURLOPT_URL, resource.c_str()); + Stream stream(std::move(fh), streams, m_block_size); + State state(0, stream, curl, false); + state.CopyHeaders(req); + +#ifdef XRD_CHUNK_RESP + if (streams > 1) { + return RunCurlWithStreams(req, state, "ProcessPullReq", streams); + } else { + return RunCurlWithUpdates(curl, req, state, "ProcessPullReq"); + } +#else + return RunCurlBasic(curl, req, state, "ProcessPullReq"); +#endif +} + + +extern "C" { + +XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) { + if (curl_global_init(CURL_GLOBAL_DEFAULT)) { + log->Emsg("Initialize", "libcurl failed to initialize"); + return nullptr; + } + + TPCHandler *retval{nullptr}; + if (!config) { + log->Emsg("Initialize", "TPC handler requires a config filename in order to load"); + return nullptr; + } + try { + log->Emsg("Initialize", "Will load configuration for the TPC handler from", config); + retval = new TPCHandler(log, config, myEnv); + } catch (std::runtime_error &re) { + log->Emsg("Initialize", "Encountered a runtime failure when loading ", re.what()); + //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*")); + } + return retval; +} + +} + diff --git a/src/XrdTpc/tpc.hh b/src/XrdTpc/tpc.hh new file mode 100644 index 00000000000..cfdd9896301 --- /dev/null +++ b/src/XrdTpc/tpc.hh @@ -0,0 +1,73 @@ + +#include +#include +#include + +#include "XrdHttp/XrdHttpExtHandler.hh" + +class XrdOucErrInfo; +class XrdOucStream; +class XrdSfsFile; +class XrdSfsFileSystem; +typedef void CURL; + +namespace TPC { +class State; + +class TPCHandler : public XrdHttpExtHandler { +public: + TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv); + virtual ~TPCHandler(); + + virtual bool MatchesPath(const char *verb, const char *path) override; + virtual int ProcessReq(XrdHttpExtReq &req) override; + // Abstract method in the base class, but does not seem to be used + virtual int Init(const char *cfgfile) override {return 0;} + +private: + int ProcessOptionsReq(XrdHttpExtReq &req); + + static std::string GetAuthz(XrdHttpExtReq &req); + + int RedirectTransfer(XrdHttpExtReq &req, XrdOucErrInfo &error); + + int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode, + int openMode, const XrdSecEntity &sec, + const std::string &authz); + +#ifdef XRD_CHUNK_RESP + int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state, + bool &success); + + int SendPerfMarker(XrdHttpExtReq &req, off_t bytes_transferred); + + // Perform the libcurl transfer, periodically sending back chunked updates. + int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state, + const char *log_prefix); + + // Experimental multi-stream version of RunCurlWithUpdates + int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state, + const char *log_prefix, size_t streams); +#else + int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state, + const char *log_prefix); +#endif + + int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req); + int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req); + + bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, + std::string &path2, bool &path2_alt); + bool Configure(const char *configfn, XrdOucEnv *myEnv); + + static constexpr int m_marker_period = 5; + static constexpr size_t m_block_size = 16*1024*1024; + bool m_desthttps{false}; + std::string m_cadir; + static std::atomic m_monid; + XrdSysError &m_log; + std::unique_ptr m_sfs; + void *m_handle_base{nullptr}; + void *m_handle_chained{nullptr}; +}; +} diff --git a/src/XrdTpc/xrootd-test-tpc b/src/XrdTpc/xrootd-test-tpc new file mode 100755 index 00000000000..2508444a786 --- /dev/null +++ b/src/XrdTpc/xrootd-test-tpc @@ -0,0 +1,100 @@ +#!/usr/bin/python + +import os +import sys +import argparse + +import requests + +def parse_args(): + parser = argparse.ArgumentParser(description='Drive test TPC transfers') + parser.add_argument("-t", "--token", help="Bearer token for transfer") + parser.add_argument("--dest-token", help="Bearer token for destination host") + parser.add_argument("--src-token", help="Bearer token for source host") + parser.add_argument("--push", dest="mode", action="store_const", const="push", help="Use push-mode for transfer (source manages transfer)", default="auto") + parser.add_argument("--pull", dest="mode", action="store_const", const="pull", help="Use pull-mode for transfer (destination manages transfer)") + parser.add_argument("--no-overwrite", dest="overwrite", action="store_false", default=True, help="Disable overwrite of existing files.") + parser.add_argument("--streams", dest="streams", help="Allow multiple streams", default=1, + type=int) + parser.add_argument("src") + parser.add_argument("dest") + + args = parser.parse_args() + + if args.streams < 1: + print >> sys.stderr, "Invalid number of streams specified: %d" % args.streams + sys.exit(1) + + if not args.token and (not args.src_token or not args.dest_token): + if 'SCITOKEN' in os.environ and os.path.exists(os.environ['SCITOKEN']): + args.token = os.environ['SCITOKEN'] + elif os.path.exists(os.path.expanduser("~/.scitokens/token")): + args.token = os.path.expanduser("~/.scitokens/token") + elif os.path.exists('/tmp/scitoken_u%d' % os.geteuid()): + args.token = '/tmp/scitoken_u%d' % os.geteuid() + else: + print >> sys.stderr, "No token file found in user environment." + sys.exit(1) + + if not args.src_token: + args.src_token = args.token + if not args.dest_token: + args.dest_token = args.token + + return args + +def get_token(fname): + with open(fname, "r") as fp: + for line in fp: + if line.startswith("#"): + continue + return line.strip() + raise Exception("No token found in specified file (%s)" % fname) + +def determine_mode(args): + verbs = requests.options(args.dest) + if 'allow' in verbs.headers: + if 'COPY' in verbs.headers['allow'].split(","): + return 'pull' + return 'push' + +def main(): + args = parse_args() + + src_token = get_token(args.src_token) + dest_token = get_token(args.dest_token) + + headers = {} + if args.overwrite: + headers['Overwrite'] = 'T' + else: + headers['Overwrite'] = 'F' + if args.streams > 1: + headers['X-Number-Of-Streams'] = str(args.streams) + mode = args.mode + if mode == "auto": + mode = determine_mode(args) + print "Auto detect determined %s mode" % mode + + with requests.Session() as session: + session.verify = '/etc/grid-security/certificates' + if mode == 'pull': + headers['Authorization'] = 'Bearer %s' % dest_token + headers['Source'] = args.src + headers['Copy-Header'] = 'Authorization: Bearer %s' % src_token + url = args.dest + else: # Push + headers['Authorization'] = 'Bearer %s' % src_token + headers['Destination'] = args.dest + headers['Copy-Header'] = 'Authorization: Bearer %s' % dest_token + url = args.src + try_again = True + while try_again: + resp = session.request('COPY', url, headers=headers, allow_redirects = True) + print resp.status_code + print resp.headers + print resp.text + break + +if __name__ == '__main__': + main()