Skip to content

Commit

Permalink
[XrdCl] xrdcp: implement xrate threshold, part 2.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Apr 27, 2021
1 parent 0fce57c commit 799f34d
Showing 1 changed file with 40 additions and 26 deletions.
66 changes: 40 additions & 26 deletions src/XrdCl/XrdClClassicCopyJob.cc
Expand Up @@ -62,21 +62,25 @@ namespace
//----------------------------------------------------------------------------
//! Helper timer class
//----------------------------------------------------------------------------
class timer_sec_t
template<typename U = std::ratio<1, 1>>
class mytimer_t
{
public:
timer_sec_t() : start( clock_t::now() ){ }
mytimer_t() : start( clock_t::now() ){ }
void reset(){ start = clock_t::now(); }
uint16_t elapsed() const
{
return std::chrono::duration_cast<sec_t>( clock_t::now() - start ).count();
return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
}
private:
typedef std::chrono::high_resolution_clock clock_t;
typedef std::chrono::duration<uint16_t> sec_t;
typedef std::chrono::duration<uint64_t, U> unit_t;
std::chrono::time_point<clock_t> start;
};

using timer_sec_t = mytimer_t<std::milli>;
using timer_nsec_t = mytimer_t<std::nano>;

//----------------------------------------------------------------------------
//! Check sum helper for stdio
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -2059,8 +2063,10 @@ namespace XrdCl
}

ChunkInfo chunkInfo;
uint64_t total_processed = 0;
uint64_t processed = 0;
auto start = time_nsec();
timer_nsec_t threshold_timer;
while( 1 )
{
st = src->GetChunk( chunkInfo );
Expand All @@ -2073,36 +2079,43 @@ namespace XrdCl
if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
return XRootDStatus( stError, errOperationExpired, 0, "CPTimeout exceeded." );

if( xRate || xRateThreashold )
if( xRate )
{
auto elapsed = ( time_nsec() - start ).count();
double transferred = processed + chunkInfo.length;
double transferred = total_processed + chunkInfo.length;
double expected = double( xRate ) / to_nsec( 1 ) * elapsed;
double threshold = double( xRateThreashold ) / to_nsec( 1 ) * elapsed;
//----------------------------------------------------------------------
// check if our transfer rate is not bellow the threshold
// check if our transfer rate didn't exceeded the limit
// (we are too fast)
//----------------------------------------------------------------------
if( elapsed && // make sure elapsed time is greater than 0
transferred > expected )
{
auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
sleep_nsec( nsec );
}
}

if( xRateThreashold )
{
auto elapsed = threshold_timer.elapsed();
double transferred = processed + chunkInfo.length;
double expected = double( xRateThreashold ) / to_nsec( 1 ) * elapsed;
//----------------------------------------------------------------------
// check if our transfer rate dropped below the threshold
// (we are too slow)
//----------------------------------------------------------------------
if( xRateThreashold && elapsed && // make sure elapsed time is greater than 0
transferred < threshold )
if( elapsed && // make sure elapsed time is greater than 0
transferred < expected )
{
log->Warning( UtilityMsg, "Transfer rate dropped below requested threshold,"
" trying different source!" );
std::cout << "calling TryOtherServer" << std::endl;
XRootDStatus st = src->TryOtherServer();
if( !st.IsOK() ) return XRootDStatus( stError, errThresholdExceeded, 0,
"The transfer rate dropped below "
"requested threshold!" );
}
//----------------------------------------------------------------------
// check if our transfer rate didn't exceeded the limit
// (we are too fast)
//----------------------------------------------------------------------
if( xRate && elapsed && // make sure elapsed time is greater than 0
transferred > expected )
{
auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
sleep_nsec( nsec );
processed = 0;
threshold_timer.reset();
}
}

Expand All @@ -2119,10 +2132,11 @@ namespace XrdCl
return UpdateErrMsg( st, "destination" );
}

processed += chunkInfo.length;
total_processed += chunkInfo.length;
processed += chunkInfo.length;
if( progress )
{
progress->JobProgress( pJobId, processed, size );
progress->JobProgress( pJobId, total_processed, size );
if( progress->ShouldCancel( pJobId ) )
return XRootDStatus( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" );
}
Expand All @@ -2148,13 +2162,13 @@ namespace XrdCl
// The size of the source is known and not enough data has been transfered
// to the destination
//--------------------------------------------------------------------------
if( src->GetSize() >= 0 && size != processed )
if( src->GetSize() >= 0 && size != total_processed )
{
log->Error( UtilityMsg, "The declared source size is %ld bytes, but "
"received %ld bytes.", size, processed );
"received %ld bytes.", size, total_processed );
return XRootDStatus( stError, errDataError );
}
pResults->Set( "size", processed );
pResults->Set( "size", total_processed );

//--------------------------------------------------------------------------
// Finalize the destination
Expand Down

0 comments on commit 799f34d

Please sign in to comment.