Skip to content

Commit

Permalink
[XrdCl] xrdcp: use pgread/pgwrite only if server supports it.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent 73ddfb3 commit ce38052
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 37 deletions.
78 changes: 47 additions & 31 deletions src/XrdCl/XrdClClassicCopyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,23 +329,6 @@ namespace
return XrdCl::XRootDStatus();
}

inline bool HasXAttr( const XrdCl::URL &url )
{
if( url.IsLocalFile() ) return true;
XrdCl::AnyObject qryResult;
XrdCl::XRootDStatus st = XrdCl::DefaultEnv::GetPostMaster()->
QueryTransport( url, XrdCl::XRootDQuery::ProtocolVersion, qryResult );
if( st.IsOK() )
{
int *protver = 0;
qryResult.Get( protver );
bool result = ( *protver == kXR_PROTXATTVERSION );
delete protver;
return result;
}
return false;
}

//----------------------------------------------------------------------------
//! Abstract chunk source
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -792,15 +775,11 @@ namespace
pUrl( url ), pFile( new XrdCl::File() ), pSize( -1 ),
pCurrentOffset( 0 ), pChunkSize( chunkSize ),
pParallel( parallelChunks ),
pNbConn( 0 )
pNbConn( 0 ), pUsePgRead( false )
{
int val = XrdCl::DefaultSubStreamsPerChannel;
XrdCl::DefaultEnv::GetEnv()->GetInt( "SubStreamsPerChannel", val );
pMaxNbConn = val - 1; // account for the control stream

val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgRead = !url->IsLocalFile() && ( val == 1 );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -856,7 +835,15 @@ namespace
}

if( !pUrl->IsLocalFile() || ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
{
pFile->GetProperty( "DataServer", pDataServer );
//--------------------------------------------------------------------
// Decide whether we can use PgRead
//--------------------------------------------------------------------
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
}

SetOnDataConnectHandler( pFile );

Expand Down Expand Up @@ -1224,6 +1211,17 @@ namespace
}
}

if( !pUrl->IsLocalFile() || ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
{
pZipArchive->GetProperty( "DataServer", pDataServer );
//--------------------------------------------------------------------
// Decide whether we can use PgRead
//--------------------------------------------------------------------
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
}

SetOnDataConnectHandler( pZipArchive );

return XrdCl::XRootDStatus();
Expand Down Expand Up @@ -1352,11 +1350,8 @@ namespace
const std::vector<std::string> &addcks ):
Source( ckSumType, addcks ),
pUrl( url ), pFile( new XrdCl::File() ), pCurrentOffset( 0 ),
pChunkSize( chunkSize ), pDone( false )
pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
{
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgRead = !url->IsLocalFile() && ( val == 1 );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -1397,6 +1392,18 @@ namespace
}
}

if( !pUrl->IsLocalFile() || ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
{
std::string datasrv;
pFile->GetProperty( "DataServer", datasrv );
//--------------------------------------------------------------------
// Decide whether we can use PgRead
//--------------------------------------------------------------------
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
}

return XRootDStatus();
}

Expand Down Expand Up @@ -1856,11 +1863,8 @@ namespace
const std::string &ckSumType ):
Destination( ckSumType ),
pUrl( url ), pFile( new XrdCl::File( XrdCl::File::DisableVirtRedirect ) ),
pParallel( parallelChunks ), pSize( -1 )
pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false )
{
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgWrt = !url.IsLocalFile() && ( val == 1 );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -1936,6 +1940,18 @@ namespace
if( !st.IsOK() )
return st;

if( !pUrl.IsLocalFile() || ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
{
std::string datasrv;
pFile->GetProperty( "DataServer", datasrv );
//--------------------------------------------------------------------
// Decide whether we can use PgRead
//--------------------------------------------------------------------
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgWrt = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
}

std::string cptarget = XrdCl::DefaultCpTarget;
XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
if( !cptarget.empty() )
Expand Down Expand Up @@ -2820,7 +2836,7 @@ namespace XrdCl
//--------------------------------------------------------------------------
// Copy extended attributes
//--------------------------------------------------------------------------
if( preserveXAttr && HasXAttr( GetSource() ) && HasXAttr( GetTarget() ) )
if( preserveXAttr && Utils::HasXAttr( GetSource() ) && Utils::HasXAttr( GetTarget() ) )
{
std::vector<xattr_t> xattrs;
st = src->GetXAttr( xattrs );
Expand Down
32 changes: 28 additions & 4 deletions src/XrdCl/XrdClXCpSrc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "XrdCl/XrdClLog.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdCl/XrdClConstants.hh"
#include "XrdCl/XrdClUtils.hh"

#include <cmath>
#include <cstdlib>
Expand Down Expand Up @@ -107,11 +108,8 @@ class ChunkHandler: public ResponseHandler
XCpSrc::XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx ) :
pChunkSize( chunkSize ), pParallel( parallel ), pFileSize( fileSize ), pThread(),
pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ),
pRunning( false ), pStartTime( 0 ), pTransferTime( 0 )
pRunning( false ), pStartTime( 0 ), pTransferTime( 0 ), pUsePgRead( false )
{
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "UsePgWrtRd", val );
pUsePgRead = ( val == 1 );
}

XCpSrc::~XCpSrc()
Expand Down Expand Up @@ -253,6 +251,19 @@ XRootDStatus XCpSrc::Initialize()
continue;
}

URL url( pUrl );
if( url.IsLocalFile() || ( url.IsLocalFile() && url.IsMetalink() ) )
{
std::string datasrv;
pFile->GetProperty( "DataServer", datasrv );
//--------------------------------------------------------------------
// Decide whether we can use PgRead
//--------------------------------------------------------------------
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
}

if( pFileSize < 0 )
{
StatInfo *statInfo = 0;
Expand Down Expand Up @@ -304,6 +315,19 @@ XRootDStatus XCpSrc::Recover()
DeletePtr( pFile );
log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
}

URL url( pUrl );
if( url.IsLocalFile() || ( url.IsLocalFile() && url.IsMetalink() ) )
{
std::string datasrv;
pFile->GetProperty( "DataServer", datasrv );
//--------------------------------------------------------------------
// Decide whether we can use PgRead
//--------------------------------------------------------------------
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
}
}
while( !st.IsOK() );

Expand Down
4 changes: 2 additions & 2 deletions src/XrdCl/XrdClXRootDTransport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ namespace XrdCl
while( leftToBeRead )
{
int bytesRead = 0;
XRootDStatus status = socket->Read( message->GetBufferAtCursor(), leftToBeRead, bytesRead );

XRootDStatus status = socket->Read( message->GetBufferAtCursor(),
leftToBeRead, bytesRead );
if( !status.IsOK() || status.code == suRetry )
return status;

Expand Down

0 comments on commit ce38052

Please sign in to comment.