Skip to content

Commit

Permalink
[XrdCl] Extreme copy implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Apr 12, 2017
1 parent 768b872 commit 9849c72
Show file tree
Hide file tree
Showing 17 changed files with 1,611 additions and 20 deletions.
5 changes: 5 additions & 0 deletions docs/man/xrdcp.1
Expand Up @@ -446,6 +446,11 @@ XRD_GLFNREDIRECTOR
The redirector will be used as a last resort if the GLFN tag is specified in a Metalink file.
.RE

XRD_XCPBLOCKSIZE
.RS 5
Maximu size of a data block assigned to a single source in case of an extreme copy transfer.
.RE

.SH NOTES
Documentation for all components associated with \fBxrdcp\fR can be found at
http://xrootd.org/docs.html
Expand Down
2 changes: 2 additions & 0 deletions src/XrdCl/CMakeLists.txt
Expand Up @@ -65,6 +65,8 @@ add_library(
XrdClMetalinkRedirector.cc XrdClMetalinkRedirector.hh
XrdClRedirectorRegistry.cc XrdClRedirectorRegistry.hh
XrdClZipArchiveReader.cc XrdClZipArchiveReader.hh
XrdClXCpCtx.cc XrdClXCpCtx.hh
XrdClXCpSrc.cc XrdClXCpSrc.hh
)

target_link_libraries(
Expand Down
148 changes: 145 additions & 3 deletions src/XrdCl/XrdClClassicCopyJob.cc
Expand Up @@ -34,7 +34,6 @@
#include "XrdCl/XrdClUglyHacks.hh"
#include "XrdCl/XrdClRedirectorRegistry.hh"
#include "XrdCl/XrdClZipArchiveReader.hh"

#include <memory>
#include <iostream>
#include <queue>
Expand All @@ -45,6 +44,7 @@
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include "XrdClXCpCtx.hh"

namespace
{
Expand Down Expand Up @@ -1078,6 +1078,139 @@ namespace
bool pDone;
};

//----------------------------------------------------------------------------
//! XRootDSourceDynamic
//----------------------------------------------------------------------------
class XRootDSourceXCp: public Source
{
public:
//------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------
XRootDSourceXCp( const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
{
}

~XRootDSourceXCp()
{
if( pXCpCtx )
pXCpCtx->Delete();
}

//------------------------------------------------------------------------
//! Initialize the source
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus Initialize()
{
XrdCl::Log *log = XrdCl::DefaultEnv::GetLog();
int64_t fileSize = -1;

if( pUrl->IsMetalink() )
{
XrdCl::RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
fileSize = redirector->GetSize();
pReplicas = redirector->GetReplicas();
}
else
{
XrdCl::LocationInfo *li = 0;
XrdCl::FileSystem fs( *pUrl );
XrdCl::XRootDStatus st = fs.DeepLocate( pUrl->GetPath(), XrdCl::OpenFlags::Compress | XrdCl::OpenFlags::PrefName, li );
if( !st.IsOK() ) return st;

XrdCl::LocationInfo::Iterator itr;
for( itr = li->Begin(); itr != li->End(); ++itr)
{
std::string url = "root://" + itr->GetAddress() + "/" + pUrl->GetPath();
pReplicas.push_back( url );
}

delete li;
}

std::stringstream ss;
ss << "XCp sources: ";

std::vector<std::string>::iterator itr;
for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
{
ss << *itr << ", ";
}
log->Debug( XrdCl::UtilityMsg, ss.str().c_str() );

pXCpCtx = new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );

return pXCpCtx->Initialize();
}

//------------------------------------------------------------------------
//! Get size
//------------------------------------------------------------------------
virtual int64_t GetSize()
{
return pXCpCtx->GetSize();
}

//------------------------------------------------------------------------
//! Get a data chunk from the source
//!
//! @param buffer buffer for the data
//! @param ci chunk information
//! @return status of the operation
//! suContinue - there are some chunks left
//! suDone - no chunks left
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus GetChunk( XrdCl::ChunkInfo &ci )
{
XrdCl::XRootDStatus st;
do
{
st = pXCpCtx->GetChunk( ci );
}
while( st.IsOK() && st.code == XrdCl::suRetry );
return st;
}

//------------------------------------------------------------------------
// Get check sum
//------------------------------------------------------------------------
virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
std::string &checkSumType )
{
if( pUrl->IsMetalink() )
{
XrdCl::RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
checkSum = redirector->GetCheckSum( checkSumType );
if( !checkSum.empty() ) return XrdCl::XRootDStatus();
}

std::vector<std::string>::iterator itr;
for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
{
XrdCl::URL url( *itr );
XrdCl::XRootDStatus st = XrdCl::Utils::GetRemoteCheckSum( checkSum,
checkSumType, url.GetHostId(), url.GetPath() );
if( st.IsOK() ) return st;
}

return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNoMoreReplicas );
}

private:


XrdCl::XCpCtx *pXCpCtx;
const XrdCl::URL *pUrl;
std::vector<std::string> pReplicas;
uint32_t pChunkSize;
uint16_t pParallelChunks;
int32_t pNbSrc;
uint64_t pBlockSize;
};

//----------------------------------------------------------------------------
//! Local destination
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1613,7 +1746,9 @@ namespace XrdCl
std::string zipSource;
uint16_t parallelChunks;
uint32_t chunkSize;
bool posc, force, coerce, makeDir, dynamicSource, zip;
uint64_t blockSize;
bool posc, force, coerce, makeDir, dynamicSource, zip, xcp;
int32_t nbXcpSources;

pProperties->Get( "checkSumMode", checkSumMode );
pProperties->Get( "checkSumType", checkSumType );
Expand All @@ -1626,15 +1761,22 @@ namespace XrdCl
pProperties->Get( "makeDir", makeDir );
pProperties->Get( "dynamicSource", dynamicSource );
pProperties->Get( "zipArchive", zip );
pProperties->Get( "xcp", xcp );
pProperties->Get( "xcpBlockSize", blockSize );

if( zip )
pProperties->Get( "zipSource", zipSource );

if( xcp )
pProperties->Get( "nbXcpSources", nbXcpSources );

//--------------------------------------------------------------------------
// Initialize the source and the destination
//--------------------------------------------------------------------------
XRDCL_SMART_PTR_T<Source> src;
if( zip )
if( xcp )
src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
else if( zip ) // TODO make zip work for xcp
src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks ) );
else if( GetSource().GetProtocol() == "file" )
src.reset( new LocalSource( &GetSource(), checkSumType, chunkSize ) );
Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClConstants.hh
Expand Up @@ -66,6 +66,7 @@ namespace XrdCl
const int DefaultParallelEvtLoop = 1;
const int DefaultMetalinkProcessing = 1;
const int DefaultLocalMetalinkFile = 1;
const int DefaultXCpBlockSize = 134217728; // DefaultCPChunkSize * DefaultCPParallelChunks * 2

const char * const DefaultPollerPreference = "built-in";
const char * const DefaultNetworkStack = "IPAuto";
Expand Down
25 changes: 19 additions & 6 deletions src/XrdCl/XrdClCopy.cc
Expand Up @@ -306,12 +306,6 @@ bool AllOptionsSupported( XrdCpConfig *config )
return false;
}

if( config->nSrcs != 1 )
{
std::cerr << "Multiple sources are not yet supported" << std::endl;
return false;
}

return true;
}

Expand Down Expand Up @@ -612,6 +606,17 @@ int main( int argc, char **argv )
zip = true;
}

//----------------------------------------------------------------------------
// Extreme Copy
//----------------------------------------------------------------------------
int nbSources = 0;
bool xcp = false;
if( config.Want( XrdCpConfig::DoSources ) )
{
nbSources = config.nSrcs;
xcp = true;
}

//----------------------------------------------------------------------------
// Environment settings
//----------------------------------------------------------------------------
Expand All @@ -622,6 +627,9 @@ int main( int argc, char **argv )
int chunkSize = DefaultCPChunkSize;
env->GetInt( "CPChunkSize", chunkSize );

int blockSize = DefaultXCpBlockSize;
env->GetInt( "XCpBlockSize", blockSize );

int parallelChunks = DefaultCPParallelChunks;
env->GetInt( "CPParallelChunks", parallelChunks );
if( parallelChunks < 1 ||
Expand Down Expand Up @@ -795,10 +803,15 @@ int main( int argc, char **argv )
properties.Set( "chunkSize", chunkSize );
properties.Set( "parallelChunks", parallelChunks );
properties.Set( "zipArchive", zip );
properties.Set( "xcp", xcp );
properties.Set( "xcpBlockSize", blockSize );

if( zip )
properties.Set( "zipSource", zipFile );

if( xcp )
properties.Set( "nbXcpSources", nbSources );


XRootDStatus st = process.AddJob( properties, results );
if( !st.IsOK() )
Expand Down
9 changes: 8 additions & 1 deletion src/XrdCl/XrdClCopyProcess.cc
Expand Up @@ -167,7 +167,7 @@ namespace XrdCl
pJobProperties.push_back( properties );
PropertyList &p = pJobProperties.back();

const char *bools[] = {"target", "force", "posc", "coerce", "makeDir", "zipArchive", 0};
const char *bools[] = {"target", "force", "posc", "coerce", "makeDir", "zipArchive", "xcp", 0};
for( int i = 0; bools[i]; ++i )
if( !p.HasProperty( bools[i] ) )
p.Set( bools[i], false );
Expand Down Expand Up @@ -201,6 +201,13 @@ namespace XrdCl
p.Set( "chunkSize", val );
}

if( !p.HasProperty( "xcpBlockSize" ) )
{
int val = DefaultXCpBlockSize;
env->GetInt( "XCpBlockSize", val );
p.Set( "xcpBlockSize", val );
}

if( !p.HasProperty( "initTimeout" ) )
{
int val = DefaultCPInitTimeout;
Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClDefaultEnv.cc
Expand Up @@ -285,6 +285,7 @@ namespace XrdCl
REGISTER_VAR_INT( varsInt, "ParallelEvtLoop", DefaultParallelEvtLoop );
REGISTER_VAR_INT( varsInt, "MetalinkProcessing", DefaultMetalinkProcessing );
REGISTER_VAR_INT( varsInt, "LocalMetalinkFile", DefaultLocalMetalinkFile );
REGISTER_VAR_INT( varsInt, "XCpBlockSize", DefaultXCpBlockSize );

REGISTER_VAR_STR( varsStr, "PollerPreference", DefaultPollerPreference );
REGISTER_VAR_STR( varsStr, "ClientMonitor", DefaultClientMonitor );
Expand Down
14 changes: 13 additions & 1 deletion src/XrdCl/XrdClMetalinkRedirector.hh
Expand Up @@ -78,11 +78,23 @@ class MetalinkRedirector : public VirtualRedirector
return type + ":" + it->second;
}

//----------------------------------------------------------------------------
//! Returns the file size if specified in the metalink file,
//! otherwise a negative number
//----------------------------------------------------------------------------
long long GetSize() const
{
return pFileSize;
}

//----------------------------------------------------------------------------
//! Returns a vector with replicas as given in the meatlink file
//----------------------------------------------------------------------------
const std::vector<std::string>& GetReplicas()
{
return pReplicas;
}

private:

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -144,7 +156,7 @@ class MetalinkRedirector : public VirtualRedirector

typedef std::list< std::pair<const Message*, Stream*> > RedirectList;
typedef std::map<std::string, std::string> CksumMap;
typedef std::list<std::string> ReplicaList;
typedef std::vector<std::string> ReplicaList;

RedirectList pPendingRedirects;
std::string pUrl;
Expand Down
5 changes: 5 additions & 0 deletions src/XrdCl/XrdClRedirectorRegistry.hh
Expand Up @@ -60,6 +60,11 @@ class VirtualRedirector
//! or a negative number if size was not specified
//----------------------------------------------------------------------------
virtual long long GetSize() const = 0;

//----------------------------------------------------------------------------
//! Returns a vector with replicas as given in the meatlink file
//----------------------------------------------------------------------------
virtual const std::vector<std::string>& GetReplicas() = 0;
};

//--------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClStatus.cc
Expand Up @@ -44,6 +44,7 @@ namespace
{ errNotSupported, "Operation not supported" },
{ errDataError, "Received corrupted data" },
{ errNotImplemented, "Operation is not implemented" },
{ errNoMoreReplicas, "No more replicas to try" },
{ errInvalidAddr, "Invalid address" },
{ errSocketError, "Socket error" },
{ errSocketTimeout, "Socket timeout" },
Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClStatus.hh
Expand Up @@ -61,6 +61,7 @@ namespace XrdCl
const uint16_t errNotSupported = 13;
const uint16_t errDataError = 14; //!< data is corrupted
const uint16_t errNotImplemented = 15; //!< Operation is not implemented
const uint16_t errNoMoreReplicas = 16; //!< No more replicas to try

//----------------------------------------------------------------------------
// Socket related errors
Expand Down
11 changes: 10 additions & 1 deletion src/XrdCl/XrdClSyncQueue.hh
Expand Up @@ -88,7 +88,16 @@ namespace XrdCl
pSem = new Semaphore(0);
}

private:
//------------------------------------------------------------------------
//! Check if the queue is empty
//------------------------------------------------------------------------
bool IsEmpty()
{
XrdSysMutexHelper scopedLock( pMutex );
return pQueue.empty();
}

protected:
std::queue<Item> pQueue;
XrdSysMutex pMutex;
Semaphore *pSem;
Expand Down

0 comments on commit 9849c72

Please sign in to comment.