Skip to content

Commit

Permalink
[XrdCl] xrdcp: allow switching between Read and PgRead.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent 575db98 commit 0284c9b
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 35 deletions.
79 changes: 52 additions & 27 deletions src/XrdCl/XrdClClassicCopyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ namespace
int val = XrdCl::DefaultSubStreamsPerChannel;
XrdCl::DefaultEnv::GetEnv()->GetInt( "SubStreamsPerChannel", val );
pMaxNbConn = val - 1; // account for the control stream

val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "UsePgWrtRd", val );
pUsePgRead = !url->IsLocalFile() && ( val == 1 );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -913,11 +917,10 @@ namespace
chunkSize = pSize - pCurrentOffset;

char *buffer = new char[chunkSize];
ChunkHandler *ch = new ChunkHandler;
ch->chunk.offset = pCurrentOffset;
ch->chunk.length = chunkSize;
ch->chunk.buffer = buffer;
ch->status = reader->Read( pCurrentOffset, chunkSize, buffer, ch );
ChunkHandler *ch = new ChunkHandler( pUsePgRead );
ch->status = pUsePgRead
? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
: reader->Read( pCurrentOffset, chunkSize, buffer, ch );
pChunks.push( ch );
pCurrentOffset += chunkSize;
if( !ch->status.IsOK() )
Expand Down Expand Up @@ -1003,7 +1006,7 @@ namespace
class ChunkHandler: public XrdCl::ResponseHandler
{
public:
ChunkHandler(): sem( new XrdSysSemaphore(0) ) {}
ChunkHandler( bool usepgrd ): sem( new XrdSysSemaphore(0) ), usepgrd(usepgrd) {}
virtual ~ChunkHandler() { delete sem; }
virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
XrdCl::AnyObject *response )
Expand All @@ -1012,30 +1015,46 @@ namespace
delete statusval;
if( response )
{
XrdCl::ChunkInfo *resp = 0;
response->Get( resp );
if( resp )
chunk = *resp;
chunk = ToChunk( response );
delete response;
}
sem->Post();
}

XrdCl::ChunkInfo ToChunk( XrdCl::AnyObject *response )
{
if( usepgrd )
{
XrdCl::PageInfo *resp = nullptr;
response->Get( resp );
return XrdCl::ChunkInfo( resp->GetOffset(), resp->GetLength(),
resp->GetBuffer() );
}
else
{
XrdCl::ChunkInfo *resp = nullptr;
response->Get( resp );
return *resp;
}
}

XrdSysSemaphore *sem;
XrdCl::ChunkInfo chunk;
XrdCl::XRootDStatus status;
bool usepgrd;
};

const XrdCl::URL *pUrl;
XrdCl::File *pFile;
int64_t pSize;
int64_t pCurrentOffset;
uint32_t pChunkSize;
uint16_t pParallel;
std::queue<ChunkHandler *> pChunks;
std::string pDataServer;
uint16_t pNbConn;
uint16_t pMaxNbConn;
const XrdCl::URL *pUrl;
XrdCl::File *pFile;
int64_t pSize;
int64_t pCurrentOffset;
uint32_t pChunkSize;
uint16_t pParallel;
std::queue<ChunkHandler*> pChunks;
std::string pDataServer;
uint16_t pNbConn;
uint16_t pMaxNbConn;
bool pUsePgRead;

std::shared_ptr<CancellableJob> pDataConnCB;
};
Expand Down Expand Up @@ -1063,7 +1082,7 @@ namespace
//------------------------------------------------------------------------
//! Destructor
//------------------------------------------------------------------------
~XRootDSourceZip()
virtual ~XRootDSourceZip()
{
CleanUpChunks();

Expand Down Expand Up @@ -1177,8 +1196,8 @@ namespace

private:

XRootDSourceZip(const XRootDSource &other);
XRootDSourceZip &operator = (const XRootDSource &other);
XRootDSourceZip(const XRootDSourceZip &other);
XRootDSourceZip &operator = (const XRootDSourceZip &other);

const std::string pFilename;
XrdCl::ZipArchive *pZipArchive;
Expand Down Expand Up @@ -1209,6 +1228,9 @@ namespace
pUrl( url ), pFile( new XrdCl::File() ), pCurrentOffset( 0 ),
pChunkSize( chunkSize ), pDone( false )
{
int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "UsePgWrtRd", val );
pUsePgRead = !url->IsLocalFile() && ( val == 1 );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -1287,8 +1309,10 @@ namespace
char *buffer = new char[pChunkSize];
uint32_t bytesRead = 0;

XRootDStatus st = pFile->Read( pCurrentOffset, pChunkSize, buffer,
bytesRead );
std::vector<uint32_t> cksums;
XRootDStatus st = pUsePgRead
? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
: pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );

if( !st.IsOK() )
{
Expand Down Expand Up @@ -1365,6 +1389,7 @@ namespace
int64_t pCurrentOffset;
uint32_t pChunkSize;
bool pDone;
bool pUsePgRead;
};

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1653,7 +1678,7 @@ namespace
//----------------------------------------------------------------------------
//! XRootD destination
//----------------------------------------------------------------------------
class XRootDDestination: public Destination
class XRootDDestination: public Destination // TODO Read/PgRead
{
public:
//------------------------------------------------------------------------
Expand Down Expand Up @@ -1995,7 +2020,7 @@ namespace
//----------------------------------------------------------------------------
//! XRootD destination
//----------------------------------------------------------------------------
class XRootDZipDestination: public Destination
class XRootDZipDestination: public Destination // TODO Read/PgRead
{
public:
//------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClConstants.hh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ namespace XrdCl
const int DefaultWantTlsOnNoPgrw = 0;
const int DefaultRetryWrtAtLBLimit = 3;
const int DefaultCpRetry = 0;
const int DefaultCpUsePgWrtRd = 1;

const char * const DefaultPollerPreference = "built-in";
const char * const DefaultNetworkStack = "IPAuto";
Expand Down
35 changes: 27 additions & 8 deletions src/XrdCl/XrdClXCpSrc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class ChunkHandler: public ResponseHandler
{
public:

ChunkHandler( XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle ) :
pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle )
ChunkHandler( XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd ) :
pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle ), pUsePgRead( usepgrd )
{

}
Expand All @@ -54,7 +54,7 @@ class ChunkHandler: public ResponseHandler
ChunkInfo *chunk = 0;
if( response ) // get the response
{
response->Get( chunk );
ToChunk( response, chunk );
response->Set( ( int* )0 );
delete response;
}
Expand Down Expand Up @@ -83,11 +83,24 @@ class ChunkHandler: public ResponseHandler

private:

void ToChunk( AnyObject *response, ChunkInfo *&chunk )
{
if( pUsePgRead )
{
PageInfo *rsp = nullptr;
response->Get( rsp );
chunk = new ChunkInfo( rsp->GetOffset(), rsp->GetLength(), rsp->GetBuffer() );
}
else
response->Get( chunk );
}

XCpSrc *pSrc;
uint64_t pOffset;
uint64_t pSize;
char *pBuffer;
File *pHandle;
bool pUsePgRead;
};


Expand All @@ -96,7 +109,9 @@ XCpSrc::XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *
pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ),
pRunning( false ), pStartTime( 0 ), pTransferTime( 0 )
{

int val = XrdCl::DefaultCpUsePgWrtRd;
XrdCl::DefaultEnv::GetEnv()->GetInt( "UsePgWrtRd", val );
pUsePgRead = ( val == 1 );
}

XCpSrc::~XCpSrc()
Expand Down Expand Up @@ -317,8 +332,10 @@ XRootDStatus XCpSrc::ReadChunks()
pRecovered.erase( itr );

char *buffer = new char[p.second];
ChunkHandler *handler = new ChunkHandler( this, p.first, p.second, buffer, pFile );
XRootDStatus st = pFile->Read( p.first, p.second, buffer, handler );
ChunkHandler *handler = new ChunkHandler( this, p.first, p.second, buffer, pFile, pUsePgRead );
XRootDStatus st = pUsePgRead
? pFile->PgRead( p.first, p.second, buffer, handler )
: pFile->Read( p.first, p.second, buffer, handler );
if( !st.IsOK() )
{
delete[] buffer;
Expand All @@ -335,8 +352,10 @@ XRootDStatus XCpSrc::ReadChunks()
chunkSize = pBlkEnd - pCurrentOffset;
pOngoing[pCurrentOffset] = chunkSize;
char *buffer = new char[chunkSize];
ChunkHandler *handler = new ChunkHandler( this, pCurrentOffset, chunkSize, buffer, pFile );
XRootDStatus st = pFile->Read( pCurrentOffset, chunkSize, buffer, handler );
ChunkHandler *handler = new ChunkHandler( this, pCurrentOffset, chunkSize, buffer, pFile, pUsePgRead );
XRootDStatus st = pUsePgRead
? pFile->PgRead( pCurrentOffset, chunkSize, buffer, handler )
: pFile->Read( pCurrentOffset, chunkSize, buffer, handler );
pCurrentOffset += chunkSize;
if( !st.IsOK() )
{
Expand Down
6 changes: 6 additions & 0 deletions src/XrdCl/XrdClXCpSrc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ class XCpSrc
* the restart
*/
time_t pTransferTime;

/**
* The total time we were transferring data, before
* the restart
*/
bool pUsePgRead;
};

} /* namespace XrdCl */
Expand Down

0 comments on commit 0284c9b

Please sign in to comment.