From 9849c72ed15613956e5acbe5b069a891cc26a3c9 Mon Sep 17 00:00:00 2001 From: Michal Simon Date: Tue, 4 Oct 2016 18:14:42 +0200 Subject: [PATCH] [XrdCl] Extreme copy implementation --- docs/man/xrdcp.1 | 5 + src/XrdCl/CMakeLists.txt | 2 + src/XrdCl/XrdClClassicCopyJob.cc | 148 +++++++- src/XrdCl/XrdClConstants.hh | 1 + src/XrdCl/XrdClCopy.cc | 25 +- src/XrdCl/XrdClCopyProcess.cc | 9 +- src/XrdCl/XrdClDefaultEnv.cc | 1 + src/XrdCl/XrdClMetalinkRedirector.hh | 14 +- src/XrdCl/XrdClRedirectorRegistry.hh | 5 + src/XrdCl/XrdClStatus.cc | 1 + src/XrdCl/XrdClStatus.hh | 1 + src/XrdCl/XrdClSyncQueue.hh | 11 +- src/XrdCl/XrdClXCpCtx.cc | 195 ++++++++++ src/XrdCl/XrdClXCpCtx.hh | 284 ++++++++++++++ src/XrdCl/XrdClXCpSrc.cc | 534 +++++++++++++++++++++++++++ src/XrdCl/XrdClXCpSrc.hh | 364 ++++++++++++++++++ tests/XrdClTests/FileCopyTest.cc | 31 +- 17 files changed, 1611 insertions(+), 20 deletions(-) create mode 100644 src/XrdCl/XrdClXCpCtx.cc create mode 100644 src/XrdCl/XrdClXCpCtx.hh create mode 100644 src/XrdCl/XrdClXCpSrc.cc create mode 100644 src/XrdCl/XrdClXCpSrc.hh diff --git a/docs/man/xrdcp.1 b/docs/man/xrdcp.1 index 540619f639c..c645f5aaddc 100644 --- a/docs/man/xrdcp.1 +++ b/docs/man/xrdcp.1 @@ -446,6 +446,11 @@ XRD_GLFNREDIRECTOR The redirector will be used as a last resort if the GLFN tag is specified in a Metalink file. .RE +XRD_XCPBLOCKSIZE +.RS 5 +Maximu size of a data block assigned to a single source in case of an extreme copy transfer. +.RE + .SH NOTES Documentation for all components associated with \fBxrdcp\fR can be found at http://xrootd.org/docs.html diff --git a/src/XrdCl/CMakeLists.txt b/src/XrdCl/CMakeLists.txt index 8821cb14dc7..df2bd38ccde 100644 --- a/src/XrdCl/CMakeLists.txt +++ b/src/XrdCl/CMakeLists.txt @@ -65,6 +65,8 @@ add_library( XrdClMetalinkRedirector.cc XrdClMetalinkRedirector.hh XrdClRedirectorRegistry.cc XrdClRedirectorRegistry.hh XrdClZipArchiveReader.cc XrdClZipArchiveReader.hh + XrdClXCpCtx.cc XrdClXCpCtx.hh + XrdClXCpSrc.cc XrdClXCpSrc.hh ) target_link_libraries( diff --git a/src/XrdCl/XrdClClassicCopyJob.cc b/src/XrdCl/XrdClClassicCopyJob.cc index 0aaaa449886..6de5018f275 100644 --- a/src/XrdCl/XrdClClassicCopyJob.cc +++ b/src/XrdCl/XrdClClassicCopyJob.cc @@ -34,7 +34,6 @@ #include "XrdCl/XrdClUglyHacks.hh" #include "XrdCl/XrdClRedirectorRegistry.hh" #include "XrdCl/XrdClZipArchiveReader.hh" - #include #include #include @@ -45,6 +44,7 @@ #include #include #include +#include "XrdClXCpCtx.hh" namespace { @@ -1078,6 +1078,139 @@ namespace bool pDone; }; + //---------------------------------------------------------------------------- + //! XRootDSourceDynamic + //---------------------------------------------------------------------------- + class XRootDSourceXCp: public Source + { + public: + //------------------------------------------------------------------------ + //! Constructor + //------------------------------------------------------------------------ + XRootDSourceXCp( const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ): + pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize ) + { + } + + ~XRootDSourceXCp() + { + if( pXCpCtx ) + pXCpCtx->Delete(); + } + + //------------------------------------------------------------------------ + //! Initialize the source + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus Initialize() + { + XrdCl::Log *log = XrdCl::DefaultEnv::GetLog(); + int64_t fileSize = -1; + + if( pUrl->IsMetalink() ) + { + XrdCl::RedirectorRegistry ®istry = XrdCl::RedirectorRegistry::Instance(); + XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl ); + fileSize = redirector->GetSize(); + pReplicas = redirector->GetReplicas(); + } + else + { + XrdCl::LocationInfo *li = 0; + XrdCl::FileSystem fs( *pUrl ); + XrdCl::XRootDStatus st = fs.DeepLocate( pUrl->GetPath(), XrdCl::OpenFlags::Compress | XrdCl::OpenFlags::PrefName, li ); + if( !st.IsOK() ) return st; + + XrdCl::LocationInfo::Iterator itr; + for( itr = li->Begin(); itr != li->End(); ++itr) + { + std::string url = "root://" + itr->GetAddress() + "/" + pUrl->GetPath(); + pReplicas.push_back( url ); + } + + delete li; + } + + std::stringstream ss; + ss << "XCp sources: "; + + std::vector::iterator itr; + for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr ) + { + ss << *itr << ", "; + } + log->Debug( XrdCl::UtilityMsg, ss.str().c_str() ); + + pXCpCtx = new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize ); + + return pXCpCtx->Initialize(); + } + + //------------------------------------------------------------------------ + //! Get size + //------------------------------------------------------------------------ + virtual int64_t GetSize() + { + return pXCpCtx->GetSize(); + } + + //------------------------------------------------------------------------ + //! Get a data chunk from the source + //! + //! @param buffer buffer for the data + //! @param ci chunk information + //! @return status of the operation + //! suContinue - there are some chunks left + //! suDone - no chunks left + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus GetChunk( XrdCl::ChunkInfo &ci ) + { + XrdCl::XRootDStatus st; + do + { + st = pXCpCtx->GetChunk( ci ); + } + while( st.IsOK() && st.code == XrdCl::suRetry ); + return st; + } + + //------------------------------------------------------------------------ + // Get check sum + //------------------------------------------------------------------------ + virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum, + std::string &checkSumType ) + { + if( pUrl->IsMetalink() ) + { + XrdCl::RedirectorRegistry ®istry = XrdCl::RedirectorRegistry::Instance(); + XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl ); + checkSum = redirector->GetCheckSum( checkSumType ); + if( !checkSum.empty() ) return XrdCl::XRootDStatus(); + } + + std::vector::iterator itr; + for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr ) + { + XrdCl::URL url( *itr ); + XrdCl::XRootDStatus st = XrdCl::Utils::GetRemoteCheckSum( checkSum, + checkSumType, url.GetHostId(), url.GetPath() ); + if( st.IsOK() ) return st; + } + + return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNoMoreReplicas ); + } + + private: + + + XrdCl::XCpCtx *pXCpCtx; + const XrdCl::URL *pUrl; + std::vector pReplicas; + uint32_t pChunkSize; + uint16_t pParallelChunks; + int32_t pNbSrc; + uint64_t pBlockSize; + }; + //---------------------------------------------------------------------------- //! Local destination //---------------------------------------------------------------------------- @@ -1613,7 +1746,9 @@ namespace XrdCl std::string zipSource; uint16_t parallelChunks; uint32_t chunkSize; - bool posc, force, coerce, makeDir, dynamicSource, zip; + uint64_t blockSize; + bool posc, force, coerce, makeDir, dynamicSource, zip, xcp; + int32_t nbXcpSources; pProperties->Get( "checkSumMode", checkSumMode ); pProperties->Get( "checkSumType", checkSumType ); @@ -1626,15 +1761,22 @@ namespace XrdCl pProperties->Get( "makeDir", makeDir ); pProperties->Get( "dynamicSource", dynamicSource ); pProperties->Get( "zipArchive", zip ); + pProperties->Get( "xcp", xcp ); + pProperties->Get( "xcpBlockSize", blockSize ); if( zip ) pProperties->Get( "zipSource", zipSource ); + if( xcp ) + pProperties->Get( "nbXcpSources", nbXcpSources ); + //-------------------------------------------------------------------------- // Initialize the source and the destination //-------------------------------------------------------------------------- XRDCL_SMART_PTR_T src; - if( zip ) + if( xcp ) + src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) ); + else if( zip ) // TODO make zip work for xcp src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks ) ); else if( GetSource().GetProtocol() == "file" ) src.reset( new LocalSource( &GetSource(), checkSumType, chunkSize ) ); diff --git a/src/XrdCl/XrdClConstants.hh b/src/XrdCl/XrdClConstants.hh index 4f6a899c5c7..1907ac5be3c 100644 --- a/src/XrdCl/XrdClConstants.hh +++ b/src/XrdCl/XrdClConstants.hh @@ -66,6 +66,7 @@ namespace XrdCl const int DefaultParallelEvtLoop = 1; const int DefaultMetalinkProcessing = 1; const int DefaultLocalMetalinkFile = 1; + const int DefaultXCpBlockSize = 134217728; // DefaultCPChunkSize * DefaultCPParallelChunks * 2 const char * const DefaultPollerPreference = "built-in"; const char * const DefaultNetworkStack = "IPAuto"; diff --git a/src/XrdCl/XrdClCopy.cc b/src/XrdCl/XrdClCopy.cc index 175947b0e9e..a45a0093d4c 100644 --- a/src/XrdCl/XrdClCopy.cc +++ b/src/XrdCl/XrdClCopy.cc @@ -306,12 +306,6 @@ bool AllOptionsSupported( XrdCpConfig *config ) return false; } - if( config->nSrcs != 1 ) - { - std::cerr << "Multiple sources are not yet supported" << std::endl; - return false; - } - return true; } @@ -612,6 +606,17 @@ int main( int argc, char **argv ) zip = true; } + //---------------------------------------------------------------------------- + // Extreme Copy + //---------------------------------------------------------------------------- + int nbSources = 0; + bool xcp = false; + if( config.Want( XrdCpConfig::DoSources ) ) + { + nbSources = config.nSrcs; + xcp = true; + } + //---------------------------------------------------------------------------- // Environment settings //---------------------------------------------------------------------------- @@ -622,6 +627,9 @@ int main( int argc, char **argv ) int chunkSize = DefaultCPChunkSize; env->GetInt( "CPChunkSize", chunkSize ); + int blockSize = DefaultXCpBlockSize; + env->GetInt( "XCpBlockSize", blockSize ); + int parallelChunks = DefaultCPParallelChunks; env->GetInt( "CPParallelChunks", parallelChunks ); if( parallelChunks < 1 || @@ -795,10 +803,15 @@ int main( int argc, char **argv ) properties.Set( "chunkSize", chunkSize ); properties.Set( "parallelChunks", parallelChunks ); properties.Set( "zipArchive", zip ); + properties.Set( "xcp", xcp ); + properties.Set( "xcpBlockSize", blockSize ); if( zip ) properties.Set( "zipSource", zipFile ); + if( xcp ) + properties.Set( "nbXcpSources", nbSources ); + XRootDStatus st = process.AddJob( properties, results ); if( !st.IsOK() ) diff --git a/src/XrdCl/XrdClCopyProcess.cc b/src/XrdCl/XrdClCopyProcess.cc index 9952c24842c..666ad25bbce 100644 --- a/src/XrdCl/XrdClCopyProcess.cc +++ b/src/XrdCl/XrdClCopyProcess.cc @@ -167,7 +167,7 @@ namespace XrdCl pJobProperties.push_back( properties ); PropertyList &p = pJobProperties.back(); - const char *bools[] = {"target", "force", "posc", "coerce", "makeDir", "zipArchive", 0}; + const char *bools[] = {"target", "force", "posc", "coerce", "makeDir", "zipArchive", "xcp", 0}; for( int i = 0; bools[i]; ++i ) if( !p.HasProperty( bools[i] ) ) p.Set( bools[i], false ); @@ -201,6 +201,13 @@ namespace XrdCl p.Set( "chunkSize", val ); } + if( !p.HasProperty( "xcpBlockSize" ) ) + { + int val = DefaultXCpBlockSize; + env->GetInt( "XCpBlockSize", val ); + p.Set( "xcpBlockSize", val ); + } + if( !p.HasProperty( "initTimeout" ) ) { int val = DefaultCPInitTimeout; diff --git a/src/XrdCl/XrdClDefaultEnv.cc b/src/XrdCl/XrdClDefaultEnv.cc index 15af43695a6..c73f9dc6dc5 100644 --- a/src/XrdCl/XrdClDefaultEnv.cc +++ b/src/XrdCl/XrdClDefaultEnv.cc @@ -285,6 +285,7 @@ namespace XrdCl REGISTER_VAR_INT( varsInt, "ParallelEvtLoop", DefaultParallelEvtLoop ); REGISTER_VAR_INT( varsInt, "MetalinkProcessing", DefaultMetalinkProcessing ); REGISTER_VAR_INT( varsInt, "LocalMetalinkFile", DefaultLocalMetalinkFile ); + REGISTER_VAR_INT( varsInt, "XCpBlockSize", DefaultXCpBlockSize ); REGISTER_VAR_STR( varsStr, "PollerPreference", DefaultPollerPreference ); REGISTER_VAR_STR( varsStr, "ClientMonitor", DefaultClientMonitor ); diff --git a/src/XrdCl/XrdClMetalinkRedirector.hh b/src/XrdCl/XrdClMetalinkRedirector.hh index da51c906915..718f390c055 100644 --- a/src/XrdCl/XrdClMetalinkRedirector.hh +++ b/src/XrdCl/XrdClMetalinkRedirector.hh @@ -78,11 +78,23 @@ class MetalinkRedirector : public VirtualRedirector return type + ":" + it->second; } + //---------------------------------------------------------------------------- + //! Returns the file size if specified in the metalink file, + //! otherwise a negative number + //---------------------------------------------------------------------------- long long GetSize() const { return pFileSize; } + //---------------------------------------------------------------------------- + //! Returns a vector with replicas as given in the meatlink file + //---------------------------------------------------------------------------- + const std::vector& GetReplicas() + { + return pReplicas; + } + private: //---------------------------------------------------------------------------- @@ -144,7 +156,7 @@ class MetalinkRedirector : public VirtualRedirector typedef std::list< std::pair > RedirectList; typedef std::map CksumMap; - typedef std::list ReplicaList; + typedef std::vector ReplicaList; RedirectList pPendingRedirects; std::string pUrl; diff --git a/src/XrdCl/XrdClRedirectorRegistry.hh b/src/XrdCl/XrdClRedirectorRegistry.hh index 417521cdef6..3e898c98851 100644 --- a/src/XrdCl/XrdClRedirectorRegistry.hh +++ b/src/XrdCl/XrdClRedirectorRegistry.hh @@ -60,6 +60,11 @@ class VirtualRedirector //! or a negative number if size was not specified //---------------------------------------------------------------------------- virtual long long GetSize() const = 0; + + //---------------------------------------------------------------------------- + //! Returns a vector with replicas as given in the meatlink file + //---------------------------------------------------------------------------- + virtual const std::vector& GetReplicas() = 0; }; //-------------------------------------------------------------------------------- diff --git a/src/XrdCl/XrdClStatus.cc b/src/XrdCl/XrdClStatus.cc index c6c4ba18f03..0b363ad8073 100644 --- a/src/XrdCl/XrdClStatus.cc +++ b/src/XrdCl/XrdClStatus.cc @@ -44,6 +44,7 @@ namespace { errNotSupported, "Operation not supported" }, { errDataError, "Received corrupted data" }, { errNotImplemented, "Operation is not implemented" }, + { errNoMoreReplicas, "No more replicas to try" }, { errInvalidAddr, "Invalid address" }, { errSocketError, "Socket error" }, { errSocketTimeout, "Socket timeout" }, diff --git a/src/XrdCl/XrdClStatus.hh b/src/XrdCl/XrdClStatus.hh index a9822c3cba7..b821f19f377 100644 --- a/src/XrdCl/XrdClStatus.hh +++ b/src/XrdCl/XrdClStatus.hh @@ -61,6 +61,7 @@ namespace XrdCl const uint16_t errNotSupported = 13; const uint16_t errDataError = 14; //!< data is corrupted const uint16_t errNotImplemented = 15; //!< Operation is not implemented + const uint16_t errNoMoreReplicas = 16; //!< No more replicas to try //---------------------------------------------------------------------------- // Socket related errors diff --git a/src/XrdCl/XrdClSyncQueue.hh b/src/XrdCl/XrdClSyncQueue.hh index 1ac25dba00e..0e48f020fde 100644 --- a/src/XrdCl/XrdClSyncQueue.hh +++ b/src/XrdCl/XrdClSyncQueue.hh @@ -88,7 +88,16 @@ namespace XrdCl pSem = new Semaphore(0); } - private: + //------------------------------------------------------------------------ + //! Check if the queue is empty + //------------------------------------------------------------------------ + bool IsEmpty() + { + XrdSysMutexHelper scopedLock( pMutex ); + return pQueue.empty(); + } + + protected: std::queue pQueue; XrdSysMutex pMutex; Semaphore *pSem; diff --git a/src/XrdCl/XrdClXCpCtx.cc b/src/XrdCl/XrdClXCpCtx.cc new file mode 100644 index 00000000000..e40061c2463 --- /dev/null +++ b/src/XrdCl/XrdClXCpCtx.cc @@ -0,0 +1,195 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) +// Author: Michal Simon +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#include "XrdCl/XrdClXCpCtx.hh" +#include "XrdCl/XrdClXCpSrc.hh" +#include "XrdCl/XrdClLog.hh" +#include "XrdCl/XrdClDefaultEnv.hh" +#include "XrdCl/XrdClConstants.hh" + +#include + +namespace XrdCl +{ + +XCpCtx::XCpCtx( const std::vector &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize ) : + pUrls( std::deque( urls.begin(), urls.end() ) ), pBlockSize( blockSize ), + pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), + pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ), + pDoneCV( 0 ), pRefCount( 1 ) +{ + SetFileSize( fileSize ); +} + +XCpCtx::~XCpCtx() +{ + // at this point there's no concurrency + // this object dies as the last one + while( !pSink.IsEmpty() ) + { + ChunkInfo *chunk = pSink.Get(); + if( chunk ) + XCpSrc::DeleteChunk( chunk ); + } +} + +bool XCpCtx::GetNextUrl( std::string & url ) +{ + XrdSysMutexHelper lck( pMtx ); + if( pUrls.empty() ) return false; + url = pUrls.front(); + pUrls.pop(); + return true; +} + +XCpSrc* XCpCtx::WeakestLink( XCpSrc *exclude ) +{ + uint64_t transferRate = -1; // set transferRate to max uint64 value + XCpSrc *ret = 0; + + std::list::iterator itr; + for( itr = pSources.begin() ; itr != pSources.end() ; ++itr ) + { + XCpSrc *src = *itr; + if( src == exclude ) continue; + uint64_t tmp = src->TransferRate(); + if( src->HasData() && tmp < transferRate ) + { + ret = src; + transferRate = tmp; + } + } + + return ret; +} + +void XCpCtx::PutChunk( ChunkInfo* chunk ) +{ + pSink.Put( chunk ); +} + +std::pair XCpCtx::GetBlock() +{ + XrdSysMutexHelper lck( pMtx ); + + uint64_t blkSize = pBlockSize, offset = pOffset; + if( pOffset + blkSize > uint64_t( pFileSize ) ) + blkSize = pFileSize - pOffset; + pOffset += blkSize; + + return std::make_pair( offset, blkSize ); +} + +void XCpCtx::SetFileSize( int64_t size ) +{ + XrdSysMutexHelper lck( pMtx ); + if( pFileSize < 0 && size >= 0 ) + { + XrdSysCondVarHelper lck( pFileSizeCV ); + pFileSize = size; + pFileSizeCV.Broadcast(); + + if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc ) + pBlockSize = pFileSize / pParallelSrc; + + if( pBlockSize < pChunkSize ) + pBlockSize = pChunkSize; + } +} + +XRootDStatus XCpCtx::Initialize() +{ + for( uint8_t i = 0; i < pParallelSrc; ++i ) + { + XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this ); + pSources.push_back( src ); + src->Start(); + } + + if( pSources.empty() ) + { + Log *log = DefaultEnv::GetLog(); + log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" ); + return XRootDStatus( stError, errInternal, EAGAIN, "XCpCtx: failed to create new threads." ); + } + + return XRootDStatus(); +} + +XRootDStatus XCpCtx::GetChunk( XrdCl::ChunkInfo &ci ) +{ + // if we received all the data we are done here + if( pDataReceived == uint64_t( pFileSize ) ) + { + XrdSysCondVarHelper lck( pDoneCV ); + pDone = true; + pDoneCV.Broadcast(); + return XRootDStatus( stOK, suDone ); + } + + // check if there are any active sources + size_t nbRunning = 0; + std::list::iterator itr; + for( itr = pSources.begin() ; itr != pSources.end() ; ++ itr) + if( (*itr)->IsRunning() ) + ++nbRunning; + + // if we don't have active sources it means we failed + if( nbRunning == 0 ) + { + XrdSysCondVarHelper lck( pDoneCV ); + pDone = true; + pDoneCV.Broadcast(); + return XRootDStatus( stError, errNoMoreReplicas ); + } + + ChunkInfo *chunk = pSink.Get(); + if( chunk ) + { + pDataReceived += chunk->length; + ci = *chunk; + delete chunk; + return XRootDStatus( stOK, suContinue ); + } + + return XRootDStatus( stOK, suRetry ); +} + +void XCpCtx::NotifyIdleSrc() +{ + pDoneCV.Broadcast(); +} + +bool XCpCtx::AllDone() +{ + XrdSysCondVarHelper lck( pDoneCV ); + + if( !pDone ) + pDoneCV.Wait( 60 ); + + return pDone; +} + + +} /* namespace XrdCl */ diff --git a/src/XrdCl/XrdClXCpCtx.hh b/src/XrdCl/XrdClXCpCtx.hh new file mode 100644 index 00000000000..6179f60e086 --- /dev/null +++ b/src/XrdCl/XrdClXCpCtx.hh @@ -0,0 +1,284 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) +// Author: Michal Simon +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#ifndef SRC_XRDCL_XRDCLXCPCTX_HH_ +#define SRC_XRDCL_XRDCLXCPCTX_HH_ + +#include "XrdCl/XrdClSyncQueue.hh" +#include "XrdCl/XrdClXRootDResponses.hh" +#include "XrdSys/XrdSysPthread.hh" + +#include +#include + +namespace XrdCl +{ + +class XCpSrc; + +class XCpCtx +{ + public: + + /** + * Constructor + * + * @param urls : list of replica urls + * @param blockSize : the default block size + * @param parallelSrc : maximum number of parallel sources + * @param chunkSize : the default chunk size + * @param parallelChunks : the default number of parallel chunks per source + * @param fileSize : the file size if specified in the metalink file + * (-1 indicates that the file size is not known and + * a stat should be done) + */ + XCpCtx( const std::vector &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize ); + + /** + * Deletes the instance if the reference counter reached 0. + */ + void Delete() + { + XrdSysMutexHelper lck( pMtx ); + --pRefCount; + if( !pRefCount ) + delete this; + } + + /** + * Increments the reference counter. + * + * @return : myself. + */ + XCpCtx* Self() + { + XrdSysMutexHelper lck( pMtx ); + ++pRefCount; + return this; + } + + /** + * Gets the next URL from the list of file replicas + * + * @param url : the output parameter + * @return : true if a url has been written to the + * url parameter, false otherwise + */ + bool GetNextUrl( std::string & url ); + + /** + * Get the 'weakest' sources + * + * @param exclude : the source that is excluded from the + * search + * @return : the weakest source + */ + XCpSrc* WeakestLink( XCpSrc *exclude ); + + /** + * Put a chunk into the sink + * + * @param chunk : the chunk + */ + void PutChunk( ChunkInfo* chunk ); + + /** + * Get next block that has to be transfered + * + * @return : pair of offset and block size + */ + std::pair GetBlock(); + + /** + * Set the file size (GetSize will block until + * SetFileSize will be called). + * Also calculates the block size. + * + * @param size : file size + */ + void SetFileSize( int64_t size ); + + /** + * Get file size. The call blocks until the file + * size is being set using SetFileSize. + */ + int64_t GetSize() + { + XrdSysCondVarHelper lck( pFileSizeCV ); + while( pFileSize < 0 ) pFileSizeCV.Wait(); + return pFileSize; + } + + /** + * Starts one thread per source, each thread + * tries to open a file, stat the file if necessary, + * and then starts reading the file, all chunks read + * go to the sink. + * + * @return Error if we were not able to create any threads + */ + XRootDStatus Initialize(); + + /** + * Gets the next chunk from the sink, if the sink is empty blocks. + * + * @param ci : the chunk retrieved from sink (output parameter) + * @retrun : stError if we failed to transfer the file, + * stOK otherwise, with one of the following codes: + * - suDone : the whole file has been transfered, + * we are done + * - suContinue : a chunk has been written into ci, + * continue calling GetChunk in order + * to retrieve remaining chunks + * - suRetry : a chunk has not been written into ci, + * try again. + */ + XRootDStatus GetChunk( XrdCl::ChunkInfo &ci ); + + /** + * Remove given source + * + * @param src : the source to be removed + */ + void RemoveSrc( XCpSrc *src ) + { + XrdSysMutexHelper lck( pMtx ); + pSources.remove( src ); + } + + /** + * Notify idle sources, used in two case: + * - if one of the sources failed and an + * idle source needs to take over + * - or if we are done and all idle source + * should be stopped + */ + void NotifyIdleSrc(); + + /** + * Returns true if all chunks have been transfered, + * otherwise blocks until NotifyIdleSrc is called, + * or a 1 minute timeout occurs. + * + * @return : true is all chunks have been transfered, + * false otherwise. + */ + bool AllDone(); + + private: + + /** + * Destructor (private). + * + * Use Delelte to destroy the object. + */ + virtual ~XCpCtx(); + + /** + * The URLs of all the replicas that were provided + * to us. + */ + std::queue pUrls; + + /** + * The size of the block allocated to a single source. + */ + uint64_t pBlockSize; + + /** + * Number of parallel sources. + */ + uint8_t pParallelSrc; + + /** + * Chunk size. + */ + uint32_t pChunkSize; + + /** + * Number of parallel chunks per source. + */ + uint8_t pParallelChunks; + + /** + * Offset in the file (everything before the offset + * has been allocated, everything after the offset + * needs to be allocated) + */ + uint64_t pOffset; + + /** + * File size. + */ + int64_t pFileSize; + + /** + * File Size conditional variable. + * (notifies waiters if the file size has been set) + */ + XrdSysCondVar pFileSizeCV; + + /** + * List of sources. Those pointers are not owned by + * this object. + */ + std::list pSources; + + /** + * A queue shared between all the sources (producers), + * and the extreme copy context (consumer). + */ + SyncQueue pSink; + + /** + * Total amount of data received + */ + uint64_t pDataReceived; + + /** + * A flag, true if all chunks have been received and we are done, + * false otherwise + */ + bool pDone; + + /** + * A condition variable, idle sources wait on this cond var until + * we are done, or until one of the active sources fails. + */ + XrdSysCondVar pDoneCV; + + /** + * A mutex guarding the object + */ + XrdSysMutex pMtx; + + /** + * Reference counter + */ + size_t pRefCount; +}; + +} /* namespace XrdCl */ + +#endif /* SRC_XRDCL_XRDCLXCPCTX_HH_ */ diff --git a/src/XrdCl/XrdClXCpSrc.cc b/src/XrdCl/XrdClXCpSrc.cc new file mode 100644 index 00000000000..480d597b554 --- /dev/null +++ b/src/XrdCl/XrdClXCpSrc.cc @@ -0,0 +1,534 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) +// Author: Michal Simon +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#include "XrdCl/XrdClXCpSrc.hh" +#include "XrdCl/XrdClXCpCtx.hh" +#include "XrdCl/XrdClLog.hh" +#include "XrdCl/XrdClDefaultEnv.hh" +#include "XrdCl/XrdClConstants.hh" + +#include +#include + +namespace XrdCl +{ + +class ChunkHandler: public ResponseHandler +{ + public: + + ChunkHandler( XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle ) : + pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle ) + { + + } + + virtual ~ChunkHandler() + { + pSrc->Delete(); + } + + virtual void HandleResponse( XRootDStatus *status, AnyObject *response ) + { + ChunkInfo *chunk = 0; + if( response ) // get the response + { + response->Get( chunk ); + response->Set( ( int* )0 ); + delete response; + } + + if( !chunk && status->IsOK() ) // if the response is not there make sure the status is error + { + *status = XRootDStatus( stError, errInternal ); + } + + if( chunk->length != pSize && status->IsOK() ) // the file size on the server is different + { // than the one specified in metalink file + *status = XRootDStatus( stError, errDataError ); + } + + if( !status->IsOK() ) + { + delete[] pBuffer; + delete chunk; + chunk = 0; + } + + pSrc->ReportResponse( status, chunk, pHandle ); + + delete this; + } + + private: + + XCpSrc *pSrc; + uint64_t pOffset; + uint64_t pSize; + char *pBuffer; + File *pHandle; +}; + + +XCpSrc::XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx ) : + pChunkSize( chunkSize ), pParallel( parallel ), pFileSize( fileSize ), pThread(), + pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ), + pRunning( false ), pStartTime( 0 ), pTransferTime( 0 ) +{ + +} + +XCpSrc::~XCpSrc() +{ + pCtx->RemoveSrc( this ); + pCtx->Delete(); +} + +void XCpSrc::Start() +{ + pRunning = true; + int rc = pthread_create( &pThread, 0, Run, this ); + if( rc ) + { + pRunning = false; + pCtx->RemoveSrc( this ); + pCtx->Delete(); + } +} + +void* XCpSrc::Run( void* arg ) +{ + XCpSrc *me = static_cast( arg ); + me->StartDownloading(); + me->Delete(); + return 0; +} + +void XCpSrc::StartDownloading() +{ + XRootDStatus st = Initialize(); + if( !st.IsOK() ) + { + pRunning = false; + return; + } + + // start counting transfer time + pStartTime = time( 0 ); + + while( pRunning ) + { + st = ReadChunks(); + if( st.IsOK() && st.code == suPartial ) + { + // we have only ongoing transfers + // so we can already ask for new block + if( GetWork().IsOK() ) continue; + } + else if( st.IsOK() && st.code == suDone ) + { + // if we are done, try to get more work, + // if successful continue + if( GetWork().IsOK() ) continue; + // keep track of the time before we go idle + pTransferTime += time( 0 ) - pStartTime; + // check if the overall download process is + // done, this makes the thread wait until + // either the download is done, or a source + // went to error, or a 60s timeout has been + // reached (the timeout is there so we can + // check if a source degraded in the meanwhile + // and now we can steal from it) + if( !pCtx->AllDone() ) + { + // reset start time after pause + pStartTime = time( 0 ); + continue; + } + // stop counting + // otherwise we are done here + pRunning = false; + return; + } + + XRootDStatus *status = pReports.Get(); + if( !status->IsOK() ) + { + Log *log = DefaultEnv::GetLog(); + std::string myHost = URL( pUrl ).GetHostName(); + log->Error( UtilityMsg, "Failed to read chunk from %s: %s", myHost.c_str(), status->GetErrorMessage().c_str() ); + + if( !Recover().IsOK() ) + { + delete status; + pRunning = false; + // notify idle sources, they might be + // interested in taking over my workload + pCtx->NotifyIdleSrc(); + // put a null chunk so we are sure + // the main thread doesn't get stuck + // at the sync queue + pCtx->PutChunk( 0 ); + // if we have data we need to wait for someone to take over + // unless the extreme copy is over, in this case we don't care + while( HasData() && !pCtx->AllDone() ); + + return; + } + } + delete status; + } +} + +XRootDStatus XCpSrc::Initialize() +{ + Log *log = DefaultEnv::GetLog(); + XRootDStatus st; + + do + { + if( !pCtx->GetNextUrl( pUrl ) ) + { + log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" ); + return XRootDStatus( stError ); + } + + log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() ); + + std::string value; + DefaultEnv::GetEnv()->GetString( "ReadRecovery", value ); + + pFile = new File(); + pFile->SetProperty( "ReadRecovery", value ); + + st = pFile->Open( pUrl, OpenFlags::Read ); + if( !st.IsOK() ) + { + log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() ); + DeletePtr( pFile ); + continue; + } + + if( pFileSize < 0 ) + { + StatInfo *statInfo = 0; + st = pFile->Stat( false, statInfo ); + if( !st.IsOK() ) + { + log->Warning( UtilityMsg, "Failed to stat %s: %s", pUrl.c_str(), st.GetErrorMessage().c_str() ); + DeletePtr( pFile ); + continue; + } + pFileSize = statInfo->GetSize(); + pCtx->SetFileSize( pFileSize ); + delete statInfo; + } + } + while( !st.IsOK() ); + + std::pair p = pCtx->GetBlock(); + pCurrentOffset = p.first; + pBlkEnd = p.second + p.first; + + return st; +} + +XRootDStatus XCpSrc::Recover() +{ + Log *log = DefaultEnv::GetLog(); + XRootDStatus st; + + do + { + if( !pCtx->GetNextUrl( pUrl ) ) + { + log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" ); + return XRootDStatus( stError ); + } + + log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() ); + + std::string value; + DefaultEnv::GetEnv()->GetString( "ReadRecovery", value ); + + pFile = new File(); + pFile->SetProperty( "ReadRecovery", value ); + + st = pFile->Open( pUrl, OpenFlags::Read ); + if( !st.IsOK() ) + { + DeletePtr( pFile ); + log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() ); + } + } + while( !st.IsOK() ); + + pRecovered.insert( pOngoing.begin(), pOngoing.end() ); + pOngoing.clear(); + + // since we have a brand new source, we need + // to restart transfer rate statistics + pTransferTime = 0; + pStartTime = time( 0 ); + pDataTransfered = 0; + + return st; +} + +XRootDStatus XCpSrc::ReadChunks() +{ + XrdSysMutexHelper lck( pMtx ); + + while( pOngoing.size() < pParallel && !pRecovered.empty() ) + { + std::pair p; + std::map::iterator itr = pRecovered.begin(); + p = *itr; + pOngoing.insert( p ); + pRecovered.erase( itr ); + + char *buffer = new char[p.second]; + ChunkHandler *handler = new ChunkHandler( this, p.first, p.second, buffer, pFile ); + XRootDStatus st = pFile->Read( p.first, p.second, buffer, handler ); + if( !st.IsOK() ) + { + delete[] buffer; + delete handler; + ReportResponse( new XRootDStatus( st ), 0, pFile ); + return st; + } + } + + while( pOngoing.size() < pParallel && pCurrentOffset < pBlkEnd ) + { + uint64_t chunkSize = pChunkSize; + if( pCurrentOffset + chunkSize > pBlkEnd ) + chunkSize = pBlkEnd - pCurrentOffset; + pOngoing[pCurrentOffset] = chunkSize; + char *buffer = new char[chunkSize]; + ChunkHandler *handler = new ChunkHandler( this, pCurrentOffset, chunkSize, buffer, pFile ); + XRootDStatus st = pFile->Read( pCurrentOffset, chunkSize, buffer, handler ); + pCurrentOffset += chunkSize; + if( !st.IsOK() ) + { + delete[] buffer; + delete handler; + ReportResponse( new XRootDStatus( st ), 0, pFile ); + return st; + } + } + + if( pOngoing.empty() ) return XRootDStatus( stOK, suDone ); + + if( pRecovered.empty() && pCurrentOffset >= pBlkEnd ) return XRootDStatus( stOK, suPartial ); + + return XRootDStatus( stOK, suContinue ); +} + +void XCpSrc::ReportResponse( XRootDStatus *status, ChunkInfo *chunk, File *handle ) +{ + XrdSysMutexHelper lck( pMtx ); + bool ignore = false; + + if( status->IsOK() ) + { + // if the status is OK remove it from + // the list of ongoing transfers, if it + // was not on the list we ignore the + // response (this could happen due to + // source change or stealing) + ignore = !pOngoing.erase( chunk->offset ); + } + else if( FilesEqual( pFile, handle ) ) + { + // if the status is NOT OK, and pFile + // match the handle it means that we see + // an error for the first time, map the + // broken file to the number of outstanding + // asynchronous operations and reset the pointer + pFailed[pFile] = pOngoing.size(); + pFile = 0; + } + else + DeletePtr( status ); + + if( !FilesEqual( pFile, handle ) ) + { + // if the pFile does not match the handle, + // it means that this response came from + // a broken source, decrement the count of + // outstanding async operations for this src, + --pFailed[handle]; + if( pFailed[handle] == 0 ) + { + // if this was the last outstanding operation + // close the file and delete it + pFailed.erase( handle ); + handle->Close(); + delete handle; + } + } + + lck.UnLock(); + + if( status ) pReports.Put( status ); + + if( ignore ) + { + DeleteChunk( chunk ); + return; + } + + if( chunk ) + { + pDataTransfered += chunk->length; + pCtx->PutChunk( chunk ); + } +} + +void XCpSrc::Steal( XCpSrc *src ) +{ + if( !src ) return; + + XrdSysMutexHelper lck1( pMtx ), lck2( src->pMtx ); + + Log *log = DefaultEnv::GetLog(); + std::string myHost = URL( pUrl ).GetHostName(), srcHost = URL( src->pUrl ).GetHostName(); + + if( !src->pRunning ) + { + // the source we are stealing from is in error state, we can have everything + + pRecovered.insert( src->pOngoing.begin(), src->pOngoing.end() ); + pRecovered.insert( src->pRecovered.begin(), src->pRecovered.end() ); + pCurrentOffset = src->pCurrentOffset; + pBlkEnd = src->pBlkEnd; + + src->pOngoing.clear(); + src->pRecovered.clear(); + src->pCurrentOffset = 0; + src->pBlkEnd = 0; + + // a broken source might be waiting for + // someone to take over his data, so we + // need to notify + pCtx->NotifyIdleSrc(); + + log->Debug( UtilityMsg, "s%: Stealing everything from %s", myHost.c_str(), srcHost.c_str() ); + + return; + } + + // the source we are stealing from is just slower, only take part of its work + // so we want a fraction of its work we want for ourself + uint64_t myTransferRate = TransferRate(), srcTransferRate = src->TransferRate(); + if( myTransferRate == 0 ) return; + double fraction = double( myTransferRate ) / double( myTransferRate + srcTransferRate ); + + if( src->pCurrentOffset < src->pBlkEnd ) + { + // the source still has a block of data + uint64_t blkSize = src->pBlkEnd - src->pCurrentOffset; + uint64_t steal = static_cast( round( fraction * blkSize ) ); + // if after stealing there will be less than one chunk + // take everything + if( blkSize - steal <= pChunkSize ) + steal = blkSize; + + pCurrentOffset = src->pBlkEnd - steal; + pBlkEnd = src->pBlkEnd; + src->pBlkEnd -= steal; + + log->Debug( UtilityMsg, "s%: Stealing fraction (%f) of block from %s", myHost.c_str(), fraction, srcHost.c_str() ); + + return; + } + + if( !src->pRecovered.empty() ) + { + size_t count = static_cast( round( fraction * src->pRecovered.size() ) ); + while( count-- ) + { + std::map::iterator itr = src->pRecovered.begin(); + pRecovered.insert( *itr ); + src->pRecovered.erase( itr ); + } + + log->Debug( UtilityMsg, "s%: Stealing fraction (%f) of recovered chunks from %s", myHost.c_str(), fraction, srcHost.c_str() ); + + return; + } + + // * a fraction < 0.5 means that we are actually slower (so it does + // not make sense to steal ongoing's from someone who's faster) + // * a fraction ~ 0.5 means that we have more or less the same transfer + // rate (similarly, it doesn't make sense to steal) + // * the source needs to be really faster (though, this is an arbitrary + // choice) to actually steal something + if( !src->pOngoing.empty() && fraction > 0.7 ) + { + size_t count = static_cast( round( fraction * src->pOngoing.size() ) ); + while( count-- ) + { + std::map::iterator itr = src->pOngoing.begin(); + pRecovered.insert( *itr ); + src->pOngoing.erase( itr ); + } + + log->Debug( UtilityMsg, "s%: Stealing fraction (%f) of ongoing chunks from %s", myHost.c_str(), fraction, srcHost.c_str() ); + } +} + +XRootDStatus XCpSrc::GetWork() +{ + std::pair p = pCtx->GetBlock(); + + if( p.second > 0 ) + { + XrdSysMutexHelper lck( pMtx ); + pCurrentOffset = p.first; + pBlkEnd = p.first + p.second; + + Log *log = DefaultEnv::GetLog(); + std::string myHost = URL( pUrl ).GetHostName(); + log->Debug( UtilityMsg, "s% got next block", myHost.c_str() ); + + return XRootDStatus(); + } + + XCpSrc *wLink = pCtx->WeakestLink( this ); + Steal( wLink ); + + // if we managed to steal something declare success + if( pCurrentOffset < pBlkEnd || !pRecovered.empty() ) return XRootDStatus(); + // otherwise return an error + return XRootDStatus( stError, errInvalidOp ); +} + +uint64_t XCpSrc::TransferRate() +{ + return pDataTransfered / ( pTransferTime + time( 0 ) - pStartTime ); +} + +} /* namespace XrdCl */ diff --git a/src/XrdCl/XrdClXCpSrc.hh b/src/XrdCl/XrdClXCpSrc.hh new file mode 100644 index 00000000000..e0fa6212bcf --- /dev/null +++ b/src/XrdCl/XrdClXCpSrc.hh @@ -0,0 +1,364 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) +// Author: Michal Simon +//------------------------------------------------------------------------------ +// This file is part of the XRootD software suite. +// +// XRootD is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// XRootD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with XRootD. If not, see . +// +// In applying this licence, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +//------------------------------------------------------------------------------ + +#ifndef SRC_XRDCL_XRDCLXCPSRC_HH_ +#define SRC_XRDCL_XRDCLXCPSRC_HH_ + +#include "XrdCl/XrdClFile.hh" +#include "XrdCl/XrdClSyncQueue.hh" +#include "XrdSys/XrdSysPthread.hh" + +namespace XrdCl +{ + +class XCpCtx; + +class XCpSrc +{ + friend class ChunkHandler; + + public: + + /** + * Constructor. + * + * @param chunkSize : default chunk size + * @param parallel : number of parallel chunks + * @param fileSize : file size if available (e.g. in metalink file), + * should be set to -1 if not available, in this case + * a stat will be performed during initialization + * @param ctx : Extreme Copy context + */ + XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx ); + + /** + * Creates new thread with XCpSrc::Run as the start routine. + */ + void Start(); + + /** + * Stops the thread. + */ + void Stop() + { + pRunning = false; + } + + /** + * Deletes the instance if the reference counter reached 0. + */ + void Delete() + { + XrdSysMutexHelper lck( pMtx ); + --pRefCount; + if( !pRefCount ) + delete this; + } + + /** + * Increments the reference counter. + * + * @return : myself. + */ + XCpSrc* Self() + { + XrdSysMutexHelper lck( pMtx ); + ++pRefCount; + return this; + } + + /** + * @return : true if the thread is running, false otherwise + */ + bool IsRunning() + { + return pRunning; + } + + /** + * @return true if the source has a block of non zero + * size / some chunks allocated, false otherwise + */ + bool HasData() + { + XrdSysMutexHelper lck( pMtx ); + return pCurrentOffset < pBlkEnd || !pRecovered.empty() || !pOngoing.empty(); + } + + + + /** + * Get the transfer rate for current source + * + * @return : transfer rate for current source [B/s] + */ + uint64_t TransferRate(); + + /** + * Delete ChunkInfo object, and set the pointer to null. + * + * @param chunk : the chunk to be deleted + */ + static void DeleteChunk( ChunkInfo *&chunk ) + { + if( chunk ) + { + delete[] static_cast( chunk->buffer ); + delete chunk; + chunk = 0; + } + } + + private: + + /** + * Destructor (private). + * + * Use Delelte() method to destroy the object. + */ + virtual ~XCpSrc(); + + /** + * The start routine. + */ + static void* Run( void* arg ); + + /** + * Initializes the object first. + * Afterwards, starts the download. + */ + void StartDownloading(); + + /** + * Initializes the object: + * - Opens a file (retries with another + * URL, in case of failure) + * - Stats the file if necessary + * - Gets the first block (offset and size) + * for download + * + * @return : error in case the object could not be initialized + */ + XRootDStatus Initialize(); + + /** + * Tries to open the file at the next available URL. + * Moves all ongoing chunk to recovered. + * + * @return : error if run out of URLs to try, + * success otherwise + */ + XRootDStatus Recover(); + + /** + * Asynchronously reads consecutive chunks. + * + * @return : operation status: + * - suContinue : I still have work to do + * - suPartial : I only have ongoing transfers, + * but the block has been consumed + * - suDone : We are done, the block has been + * consumed, there are no ongoing + * transfers, and there are no new + * data + */ + XRootDStatus ReadChunks(); + + /** + * Steal work from given source. + * + * - if it is a failed source we can have everything + * - otherwise, if the source has a block of size + * greater than 0, steal respective fraction of + * the block + * - otherwise, if the source has recovered chunks, + * steal respective fraction of those chunks + * - otherwise, steal respective fraction of ongoing + * chunks, if we are a faster source + * + * @param src : the source from whom we are stealing + */ + void Steal( XCpSrc *src ); + + /** + * Get more work. + * First try to get a new block. + * If there are no blocks remaining, + * try stealing from others. + * + * @return : error if didn't got any data to transfer + */ + XRootDStatus GetWork(); + + /** + * This method is used by ChunkHandler to report the result of a write, + * to the source object. + * + * @param stats : operation status + * @param chunk : the read chunk (if operation failed, should be null) + * @param handle : the file object used to read the chunk + */ + void ReportResponse( XRootDStatus *status, ChunkInfo *chunk, File *handle ); + + /** + * Delets a pointer and sets it to null. + */ + template + static void DeletePtr( T *&obj ) + { + delete obj; + obj = 0; + } + + /** + * Check if two file object point to the same URL. + * + * @return : true if both files point to the same URL, + * false otherwise + */ + static bool FilesEqual( File *f1, File *f2 ) + { + if( !f1 || !f2 ) return false; + + const std::string lastURL = "LastURL"; + std::string url1, url2; + + f1->GetProperty( lastURL, url1 ); + f2->GetProperty( lastURL, url2 ); + + // remove cgi information + size_t pos = url1.find( '?' ); + if( pos != std::string::npos ) + url1 = url1.substr( 0 , pos ); + pos = url2.find( '?' ); + if( pos != std::string::npos ) + url2 = url2.substr( 0 , pos ); + + return url1 == url2; + } + + /** + * Default chunk size + */ + uint32_t pChunkSize; + + /** + * Number of parallel chunks + */ + uint8_t pParallel; + + /** + * The file size + */ + int64_t pFileSize; + + /** + * Thread id + */ + pthread_t pThread; + + /** + * Extreme Copy context + */ + XCpCtx *pCtx; + + /** + * Source URL. + */ + std::string pUrl; + + /** + * Handle to the file. + */ + File *pFile; + + std::map pFailed; + + /** + * The offset of the next chunk to be transfered. + */ + uint64_t pCurrentOffset; + + /** + * End of the our block. + */ + uint64_t pBlkEnd; + + /** + * Total number of data transfered from this source. + */ + uint64_t pDataTransfered; + + /** + * A map of ongoing transfers (the offset is the key, + * the chunk size is the value). + */ + std::map pOngoing; + + /** + * A map of stolen chunks (again the offset is the key, + * the chunk size is the value). + */ + std::map pRecovered; + + /** + * Sync queue with reports (statuses) from async reads + * that have been issued. An error appears only once + * per URL (independently of how many concurrent async + * reads are allowed). + */ + SyncQueue pReports; + + /** + * A mutex guarding the object. + */ + XrdSysRecMutex pMtx; + + /** + * Reference counter + */ + size_t pRefCount; + + /** + * A flag, true means the source is running, + * false means the source has been stopped, + * or failed. + */ + bool pRunning; + + /** + * The time when we started / restarted chunks + */ + time_t pStartTime; + + /** + * The total time we were transferring data, before + * the restart + */ + time_t pTransferTime; +}; + +} /* namespace XrdCl */ + +#endif /* SRC_XRDCL_XRDCLXCPSRC_HH_ */ diff --git a/tests/XrdClTests/FileCopyTest.cc b/tests/XrdClTests/FileCopyTest.cc index e648b6d10b3..0a0a8323ac5 100644 --- a/tests/XrdClTests/FileCopyTest.cc +++ b/tests/XrdClTests/FileCopyTest.cc @@ -329,14 +329,15 @@ void FileCopyTest::CopyTestFunc( bool thirdParty ) CPPUNIT_ASSERT( testEnv->GetString( "RemoteFile", sourceFile ) ); CPPUNIT_ASSERT( testEnv->GetString( "DataPath", dataPath ) ); - std::string sourceURL = manager1 + "/" + sourceFile; - std::string targetPath = dataPath + "/tpcFile"; - std::string targetURL = manager2 + "/" + targetPath; - std::string metalinkURL = metamanager + "/" + dataPath + "/metalink/mlTpcTest.meta4"; - std::string zipURL = metamanager + "/" + dataPath + "/data.zip"; - std::string fileInZip = "paper.txt"; - - CopyProcess process1, process2, process3, process4, process5, process6; + std::string sourceURL = manager1 + "/" + sourceFile; + std::string targetPath = dataPath + "/tpcFile"; + std::string targetURL = manager2 + "/" + targetPath; + std::string metalinkURL = metamanager + "/" + dataPath + "/metalink/mlTpcTest.meta4"; + std::string zipURL = metamanager + "/" + dataPath + "/data.zip"; + std::string fileInZip = "paper.txt"; + std::string xcpSourceURL = metamanager + "/" + dataPath + "/1db882c8-8cd6-4df1-941f-ce669bad3458.dat"; + + CopyProcess process1, process2, process3, process4, process5, process6, process7; PropertyList properties, results; FileSystem fs( manager2 ); @@ -368,6 +369,20 @@ void FileCopyTest::CopyTestFunc( bool thirdParty ) CPPUNIT_ASSERT_XRDST( fs.Rm( targetPath ) ); properties.Clear(); + // XCp test + results.Clear(); + properties.Set( "source", xcpSourceURL ); + properties.Set( "target", targetURL ); + properties.Set( "checkSumMode", "end2end" ); + properties.Set( "checkSumType", "zcrc32" ); + properties.Set( "xcp", true ); + properties.Set( "nbXcpSources", 3 ); + CPPUNIT_ASSERT_XRDST( process7.AddJob( properties, &results ) ); + CPPUNIT_ASSERT_XRDST( process7.Prepare() ); + CPPUNIT_ASSERT_XRDST( process7.Run(0) ); + CPPUNIT_ASSERT_XRDST( fs.Rm( targetPath ) ); + properties.Clear(); + //---------------------------------------------------------------------------- // Initialize and run the copy //----------------------------------------------------------------------------