diff --git a/src/XrdCl/XrdClClassicCopyJob.cc b/src/XrdCl/XrdClClassicCopyJob.cc index b983b921d2c..e53ba6499e9 100644 --- a/src/XrdCl/XrdClClassicCopyJob.cc +++ b/src/XrdCl/XrdClClassicCopyJob.cc @@ -62,21 +62,25 @@ namespace //---------------------------------------------------------------------------- //! Helper timer class //---------------------------------------------------------------------------- - class timer_sec_t + template> + class mytimer_t { public: - timer_sec_t() : start( clock_t::now() ){ } + mytimer_t() : start( clock_t::now() ){ } void reset(){ start = clock_t::now(); } uint16_t elapsed() const { - return std::chrono::duration_cast( clock_t::now() - start ).count(); + return std::chrono::duration_cast( clock_t::now() - start ).count(); } private: typedef std::chrono::high_resolution_clock clock_t; - typedef std::chrono::duration sec_t; + typedef std::chrono::duration unit_t; std::chrono::time_point start; }; + using timer_sec_t = mytimer_t; + using timer_nsec_t = mytimer_t; + //---------------------------------------------------------------------------- //! Check sum helper for stdio //---------------------------------------------------------------------------- @@ -2059,8 +2063,10 @@ namespace XrdCl } ChunkInfo chunkInfo; + uint64_t total_processed = 0; uint64_t processed = 0; auto start = time_nsec(); + timer_nsec_t threshold_timer; while( 1 ) { st = src->GetChunk( chunkInfo ); @@ -2073,36 +2079,43 @@ namespace XrdCl if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout return XRootDStatus( stError, errOperationExpired, 0, "CPTimeout exceeded." ); - if( xRate || xRateThreashold ) + if( xRate ) { auto elapsed = ( time_nsec() - start ).count(); - double transferred = processed + chunkInfo.length; + double transferred = total_processed + chunkInfo.length; double expected = double( xRate ) / to_nsec( 1 ) * elapsed; - double threshold = double( xRateThreashold ) / to_nsec( 1 ) * elapsed; //---------------------------------------------------------------------- - // check if our transfer rate is not bellow the threshold + // check if our transfer rate didn't exceeded the limit + // (we are too fast) + //---------------------------------------------------------------------- + if( elapsed && // make sure elapsed time is greater than 0 + transferred > expected ) + { + auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed; + sleep_nsec( nsec ); + } + } + + if( xRateThreashold ) + { + auto elapsed = threshold_timer.elapsed(); + double transferred = processed + chunkInfo.length; + double expected = double( xRateThreashold ) / to_nsec( 1 ) * elapsed; + //---------------------------------------------------------------------- + // check if our transfer rate dropped below the threshold // (we are too slow) //---------------------------------------------------------------------- - if( xRateThreashold && elapsed && // make sure elapsed time is greater than 0 - transferred < threshold ) + if( elapsed && // make sure elapsed time is greater than 0 + transferred < expected ) { log->Warning( UtilityMsg, "Transfer rate dropped below requested threshold," " trying different source!" ); - std::cout << "calling TryOtherServer" << std::endl; XRootDStatus st = src->TryOtherServer(); if( !st.IsOK() ) return XRootDStatus( stError, errThresholdExceeded, 0, "The transfer rate dropped below " "requested threshold!" ); - } - //---------------------------------------------------------------------- - // check if our transfer rate didn't exceeded the limit - // (we are too fast) - //---------------------------------------------------------------------- - if( xRate && elapsed && // make sure elapsed time is greater than 0 - transferred > expected ) - { - auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed; - sleep_nsec( nsec ); + processed = 0; + threshold_timer.reset(); } } @@ -2119,10 +2132,11 @@ namespace XrdCl return UpdateErrMsg( st, "destination" ); } - processed += chunkInfo.length; + total_processed += chunkInfo.length; + processed += chunkInfo.length; if( progress ) { - progress->JobProgress( pJobId, processed, size ); + progress->JobProgress( pJobId, total_processed, size ); if( progress->ShouldCancel( pJobId ) ) return XRootDStatus( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" ); } @@ -2148,13 +2162,13 @@ namespace XrdCl // The size of the source is known and not enough data has been transfered // to the destination //-------------------------------------------------------------------------- - if( src->GetSize() >= 0 && size != processed ) + if( src->GetSize() >= 0 && size != total_processed ) { log->Error( UtilityMsg, "The declared source size is %ld bytes, but " - "received %ld bytes.", size, processed ); + "received %ld bytes.", size, total_processed ); return XRootDStatus( stError, errDataError ); } - pResults->Set( "size", processed ); + pResults->Set( "size", total_processed ); //-------------------------------------------------------------------------- // Finalize the destination