Skip to content

Commit

Permalink
Merge branch 'master' into xrdtpc_stream_write_errors
Browse files Browse the repository at this point in the history
  • Loading branch information
bbockelm committed Jan 25, 2021
2 parents 60d2ec0 + f41f567 commit 1f4a2db
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 33 deletions.
6 changes: 6 additions & 0 deletions packaging/rhel/xrootd.spec.in
Expand Up @@ -946,6 +946,9 @@ fi
%{_libdir}/libXrdPosixPreload.so.2*
%{_libdir}/libXrdSsiLib.so.2*
%{_libdir}/libXrdSsiShMap.so.2*
%if %{?_with_isal:1}%{!?_with_isal:0}
%{_libdir}/libXrdEc.so.1*
%endif
%{_sysconfdir}/xrootd/client.plugins.d/client-plugin.conf.example
%config(noreplace) %{_sysconfdir}/xrootd/client.conf
# This lib may be used for LD_PRELOAD so the .so link needs to be included
Expand Down Expand Up @@ -1001,6 +1004,9 @@ fi
%{_includedir}/xrootd/private
%{_libdir}/libXrdSsiLib.so
%{_libdir}/libXrdSsiShMap.so
%if %{?_with_isal:1}%{!?_with_isal:0}
%{_libdir}/libXrdEc.so
%endif

%files client
%defattr(-,root,root,-)
Expand Down
7 changes: 7 additions & 0 deletions src/XrdCl/CMakeLists.txt
Expand Up @@ -183,17 +183,24 @@ install(

install(
FILES
# Additional client headers
XrdClMessage.hh
XrdClPostMaster.hh
XrdClPostMasterInterfaces.hh
XrdClTransportManager.hh
XrdClResponseJob.hh
XrdClZipArchive.hh
XrdClZipCache.hh
XrdClOperations.hh
XrdClOperationHandlers.hh
# Declarative operations
XrdClArg.hh
XrdClCtx.hh
XrdClFwd.hh
XrdClParallelOperation.hh
XrdClFileOperations.hh
XrdClFileSystemOperations.hh
XrdClZipOperations.hh
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/xrootd/private/XrdCl )

if( NOT XRDCL_LIB_ONLY )
Expand Down
2 changes: 1 addition & 1 deletion src/XrdCl/XrdClFileStateHandler.cc
Expand Up @@ -1109,7 +1109,7 @@ namespace XrdCl

if( !issupported )
{
DefaultEnv::GetLog()->Info( FileMsg, "[0x%x@%s] PgRead not supported; substituting with Read.",
DefaultEnv::GetLog()->Debug( FileMsg, "[0x%x@%s] PgRead not supported; substituting with Read.",
this, pFileUrl->GetURL().c_str() );
ResponseHandler *substitHandler = new PgReadSubstitutionHandler( this, handler );
st = Read( offset, size, buffer, substitHandler, timeout );
Expand Down
2 changes: 0 additions & 2 deletions src/XrdCl/XrdClZipArchive.hh
Expand Up @@ -25,8 +25,6 @@
#ifndef SRC_XRDZIP_XRDZIPARCHIVE_HH_
#define SRC_XRDZIP_XRDZIPARCHIVE_HH_

#include "XrdCl/XrdClXRootDResponses.hh"
#include "XrdCl/XrdClOperations.hh"
#include "XrdCl/XrdClFile.hh"
#include "XrdCl/XrdClResponseJob.hh"
#include "XrdCl/XrdClJobManager.hh"
Expand Down
38 changes: 28 additions & 10 deletions src/XrdEc/CMakeLists.txt
Expand Up @@ -40,25 +40,19 @@ set( ISAL_HEADERS
${ISAL_BUILDDIR}/include/types.h
)

set( ISAL_LIBS
${ISAL_BUILDDIR}/.libs/libisal.so
${ISAL_BUILDDIR}/.libs/libisal.so.2
${ISAL_BUILDDIR}/.libs/libisal.so.2.0.30
)

ExternalProject_add(
isa-l
SOURCE_DIR ${ISAL_BUILDDIR}
BUILD_IN_SOURCE 1
GIT_REPOSITORY https://github.com/01org/isa-l.git
GIT_TAG ${ISAL_VERSION}
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --with-pic
BUILD_COMMAND make ${MAKEOPTIONS}
INSTALL_COMMAND mkdir -p ${ISAL_INCDIR}/isa-l
COMMAND mkdir -p ${ISAL_LIBDIR}
COMMAND cp ${ISAL_HEADERS} ${ISAL_INCDIR}/isa-l
COMMAND cp ${ISAL_BUILDDIR}/isa-l.h ${ISAL_INCDIR}/isa-l
COMMAND cp ${ISAL_LIBS} ${ISAL_LIBDIR}
COMMAND cp ${ISAL_HEADERS} ${ISAL_INCDIR}/isa-l
COMMAND cp ${ISAL_BUILDDIR}/isa-l.h ${ISAL_INCDIR}/isa-l
COMMAND cp ${ISAL_BUILDDIR}/.libs/libisal.a ${ISAL_LIBDIR}/
)

link_directories( ${ISAL_LIBDIR} )
Expand Down Expand Up @@ -99,3 +93,27 @@ set_target_properties(
SOVERSION ${XRD_EC_SOVERSION} )

add_dependencies( XrdEc isa-l )

#------------------------------------------------------------------------------
# Install XrdEc library
#------------------------------------------------------------------------------
install(
TARGETS XrdEc
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} )

#------------------------------------------------------------------------------
# Install private header files
#------------------------------------------------------------------------------
install(
FILES
XrdEcReader.hh
XrdEcObjCfg.hh
XrdEcStrmWriter.hh
XrdEcWrtBuff.hh
XrdEcThreadPool.hh
XrdEcUtilities.hh
XrdEcObjCfg.hh
XrdEcConfig.hh
XrdEcRedundancyProvider.hh
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/xrootd/private/XrdEc )
2 changes: 2 additions & 0 deletions src/XrdEc/XrdEcReader.hh
Expand Up @@ -26,7 +26,9 @@
#define SRC_XRDEC_XRDECREADER_HH_

#include "XrdEc/XrdEcObjCfg.hh"

#include "XrdCl/XrdClZipArchive.hh"
#include "XrdCl/XrdClOperations.hh"

#include <string>
#include <unordered_map>
Expand Down
5 changes: 5 additions & 0 deletions src/XrdEc/XrdEcStrmWriter.cc
Expand Up @@ -27,6 +27,11 @@

#include "XrdOuc/XrdOucCRC32C.hh"

#include "XrdZip/XrdZipLFH.hh"
#include "XrdZip/XrdZipCDFH.hh"
#include "XrdZip/XrdZipEOCD.hh"
#include "XrdZip/XrdZipUtils.hh"

#include <numeric>
#include <algorithm>
#include <future>
Expand Down
9 changes: 2 additions & 7 deletions src/XrdEc/XrdEcStrmWriter.hh
Expand Up @@ -32,11 +32,6 @@
#include "XrdCl/XrdClParallelOperation.hh"
#include "XrdCl/XrdClZipOperations.hh"

#include "XrdZip/XrdZipLFH.hh"
#include "XrdZip/XrdZipCDFH.hh"
#include "XrdZip/XrdZipEOCD.hh"
#include "XrdZip/XrdZipUtils.hh"

#include <random>
#include <chrono>
#include <future>
Expand Down Expand Up @@ -261,7 +256,7 @@ namespace XrdEc
//!
//! @return : the buffer with metadata
//-----------------------------------------------------------------------
XrdZip::buffer_t GetMetadataBuffer();
std::vector<char> GetMetadataBuffer();

//-----------------------------------------------------------------------
//! Close the data object (implementation)
Expand All @@ -274,7 +269,7 @@ namespace XrdEc
std::unique_ptr<WrtBuff> wrtbuff; //< current write buffer
std::vector<std::shared_ptr<XrdCl::ZipArchive>> dataarchs; //< ZIP archives with data
std::vector<std::shared_ptr<XrdCl::File>> metadataarchs; //< ZIP archives with metadata
std::vector<XrdZip::buffer_t> cdbuffs; //< buffers with CDs
std::vector<std::vector<char>> cdbuffs; //< buffers with CDs
buff_queue buffers; //< queue of buffer for writing
//< (waiting to be erasure coded)
std::atomic<bool> writer_thread_stop; //< true if the writer thread should be stopped,
Expand Down
3 changes: 0 additions & 3 deletions src/XrdEc/XrdEcUtilities.hh
Expand Up @@ -30,10 +30,7 @@
#include "XrdCl/XrdClXRootDResponses.hh"
#include "XrdCl/XrdClFileSystem.hh"

#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdCl/XrdClCheckSumManager.hh"
#include "XrdCl/XrdClUtils.hh"
#include "XrdCks/XrdCksCalc.hh"

#include <exception>
#include <memory>
Expand Down
7 changes: 7 additions & 0 deletions src/XrdHeaders.cmake
Expand Up @@ -129,6 +129,13 @@ set( XROOTD_PRIVATE_HEADERS
XrdOuc/XrdOucExport.hh
XrdOuc/XrdOucPList.hh
XrdOuc/XrdOucN2NLoader.hh
XrdZip/XrdZipCDFH.hh
XrdZip/XrdZipEOCD.hh
XrdZip/XrdZipExtra.hh
XrdZip/XrdZipLFH.hh
XrdZip/XrdZipUtils.hh
XrdZip/XrdZipZIP64EOCD.hh
XrdZip/XrdZipZIP64EOCDL.hh
)

if( NOT XRDCL_ONLY )
Expand Down
65 changes: 55 additions & 10 deletions src/XrdTpc/XrdTpcMultistream.cc
Expand Up @@ -34,7 +34,10 @@ class MultiCurlHandler {
MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
m_handle(curl_multi_init()),
m_states(states),
m_log(log)
m_log(log),
m_bytes_transferred(0),
m_error_code(0),
m_status_code(0)
{
if (m_handle == NULL) {
throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
Expand Down Expand Up @@ -81,6 +84,17 @@ class MultiCurlHandler {
state_iter != m_states.end();
state_iter++) {
if (curl == (*state_iter)->GetHandle()) {
m_bytes_transferred += (*state_iter)->BytesTransferred();
int error_code = (*state_iter)->GetErrorCode();
if (error_code && !m_error_code) {
m_error_code = error_code;
m_error_message = (*state_iter)->GetErrorMessage();
}
int status_code = (*state_iter)->GetStatusCode();
if (status_code >= 400 && !m_status_code) {
m_status_code = status_code;
m_error_message = (*state_iter)->GetErrorMessage();
}
(*state_iter)->ResetAfterRequest();
break;
}
Expand Down Expand Up @@ -119,6 +133,34 @@ class MultiCurlHandler {
return current_offset;
}

int Flush() {
int last_error = 0;
for (std::vector<State*>::iterator state_it = m_states.begin();
state_it != m_states.end();
state_it++)
{
int error = (*state_it)->Flush();
if (error) {last_error = error;}
}
return last_error;
}

off_t BytesTransferred() const {
return m_bytes_transferred;
}

int GetStatusCode() const {
return m_status_code;
}

int GetErrorCode() const {
return m_error_code;
}

std::string GetErrorMessage() const {
return m_error_message;
}

private:

bool StartTransfer(off_t offset, size_t size) {
Expand Down Expand Up @@ -204,6 +246,10 @@ class MultiCurlHandler {
std::vector<CURL *> m_active_handles;
std::vector<State*> &m_states;
XrdSysError &m_log;
off_t m_bytes_transferred;
int m_error_code;
int m_status_code;
std::string m_error_message;
};
}

Expand Down Expand Up @@ -366,27 +412,26 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
throw std::runtime_error("Internal state error in libcurl");
}

state.Flush();
state.Finalize();
mch.Flush();

rec.bytes_transferred = state.BytesTransferred();
rec.tpc_status = state.GetStatusCode();
rec.bytes_transferred = mch.BytesTransferred();
rec.tpc_status = mch.GetStatusCode();

// Generate the final response back to the client.
std::stringstream ss;
success = false;
if (state.GetStatusCode() >= 400) {
std::string err = state.GetErrorMessage();
if (mch.GetStatusCode() >= 400) {
std::string err = mch.GetErrorMessage();
std::stringstream ss2;
ss2 << "Remote side failed with status code " << state.GetStatusCode();
ss2 << "Remote side failed with status code " << mch.GetStatusCode();
if (!err.empty()) {
std::replace(err.begin(), err.end(), '\n', ' ');
ss2 << "; error message: \"" << err << "\"";
}
logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
ss << "failure: " << ss2.str();
} else if (state.GetErrorCode()) {
std::string err = state.GetErrorMessage();
} else if (mch.GetErrorCode()) {
std::string err = mch.GetErrorMessage();
if (err.empty()) {err = "(no error message provided)";}
else {std::replace(err.begin(), err.end(), '\n', ' ');}
std::stringstream ss2;
Expand Down
4 changes: 4 additions & 0 deletions src/XrdTpc/XrdTpcState.cc
Expand Up @@ -200,6 +200,10 @@ ssize_t State::Write(char *buffer, size_t size) {
}

int State::Flush() {
if (m_push) {
return 0;
}

ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
if (retval == SFS_ERROR) {
m_error_buf = m_stream->GetErrorMessage();
Expand Down

0 comments on commit 1f4a2db

Please sign in to comment.