diff --git a/src/XrdCl/XrdClOperations.cc b/src/XrdCl/XrdClOperations.cc index 611c72df727..6bba0abd0b9 100644 --- a/src/XrdCl/XrdClOperations.cc +++ b/src/XrdCl/XrdClOperations.cc @@ -210,6 +210,32 @@ namespace XrdCl final = std::move( f ); } + //------------------------------------------------------------------------ + // Called by a pipeline on the handler of its first operation before Run + //------------------------------------------------------------------------ + void PipelineHandler::PreparePipelineStart() + { + // Move any final-function from the handler of the last operaiton to the + // first. It will be moved along the pipeline of handlers while the + // pipeline is run. + + if( final || !nextOperation ) return; + PipelineHandler *last = nextOperation->handler.get(); + while( last ) + { + Operation *nextop = last->nextOperation.get(); + if( !nextop ) break; + last = nextop->handler.get(); + } + if( last ) + { + // swap-then-move rather than only move as we need to guarantee that + // last->final is left without target. + std::function f; + f.swap( last->final ); + Assign( std::move( f ) ); + } + } //------------------------------------------------------------------------ // Stop the current pipeline diff --git a/src/XrdCl/XrdClOperations.hh b/src/XrdCl/XrdClOperations.hh index 24aa66a09c7..e49bb4e84cf 100644 --- a/src/XrdCl/XrdClOperations.hh +++ b/src/XrdCl/XrdClOperations.hh @@ -123,6 +123,11 @@ namespace XrdCl //------------------------------------------------------------------------ void Assign( std::function final ); + //------------------------------------------------------------------------ + //! Called by a pipeline on the handler of its first operation before Run + //------------------------------------------------------------------------ + void PreparePipelineStart(); + private: //------------------------------------------------------------------------ @@ -487,6 +492,10 @@ namespace XrdCl if( !operation ) std::logic_error( "Empty pipeline!" ); Operation *opr = operation.release(); + PipelineHandler *h = opr->handler.get(); + if( h ) + h->PreparePipelineStart(); + opr->Run( timeout, std::move( prms ), std::move( final ) ); } diff --git a/src/XrdCl/XrdClUtils.cc b/src/XrdCl/XrdClUtils.cc index 38a02f771a1..2a8d2b4943d 100644 --- a/src/XrdCl/XrdClUtils.cc +++ b/src/XrdCl/XrdClUtils.cc @@ -865,4 +865,54 @@ namespace XrdCl if( dst_supported.count( *itr ) ) return *itr; return std::string(); } + + //---------------------------------------------------------------------------- + //! Split chunks in a ChunkList into one or more ChunkLists + //---------------------------------------------------------------------------- + void Utils::SplitChunks( std::vector &listsvec, + const ChunkList &chunks, + const uint32_t maxcs, + const size_t maxc ) + { + listsvec.clear(); + if( !chunks.size() ) return; + + listsvec.emplace_back(); + ChunkList *c = &listsvec.back(); + const size_t cs = chunks.size(); + size_t idx = 0; + size_t nc = 0; + ChunkInfo tmpc; + + c->reserve( cs ); + + while( idx < cs ) + { + if( maxc && nc >= maxc ) + { + listsvec.emplace_back(); + c = &listsvec.back(); + c->reserve( cs - idx ); + nc = 0; + } + + if( tmpc.length == 0 ) + tmpc = chunks[idx]; + + if( maxcs && tmpc.length > maxcs ) + { + c->emplace_back( tmpc.offset, maxcs, tmpc.buffer ); + tmpc.offset += maxcs; + tmpc.length -= maxcs; + tmpc.buffer = static_cast( tmpc.buffer ) + maxcs; + } + else + { + c->emplace_back( tmpc.offset, tmpc.length, tmpc.buffer ); + tmpc.length = 0; + ++idx; + } + ++nc; + } + } } diff --git a/src/XrdCl/XrdClUtils.hh b/src/XrdCl/XrdClUtils.hh index 6691196f66b..e3c6a502827 100644 --- a/src/XrdCl/XrdClUtils.hh +++ b/src/XrdCl/XrdClUtils.hh @@ -272,6 +272,18 @@ namespace XrdCl if( !st.IsOK() ) return false; return protver >= kXR_PROTPGRWVERSION; } + + //------------------------------------------------------------------------ + //! Split chunks in a ChunkList into one or more ChunkLists + //! @param listsvec : output vector of ChunkLists + //! @param chunks : input ChunkLisits + //! @param maxcs : maximum size of a ChunkInfo in output + //! @param maxc : maximum number of ChunkInfo in each ChunkList + //------------------------------------------------------------------------ + static void SplitChunks( std::vector &listsvec, + const ChunkList &chunks, + const uint32_t maxcs, + const size_t maxc ); }; //---------------------------------------------------------------------------- diff --git a/src/XrdCl/XrdClZipArchive.cc b/src/XrdCl/XrdClZipArchive.cc index cba1e464d2f..41d14b53af9 100644 --- a/src/XrdCl/XrdClZipArchive.cc +++ b/src/XrdCl/XrdClZipArchive.cc @@ -28,6 +28,7 @@ #include "XrdCl/XrdClLog.hh" #include "XrdCl/XrdClDefaultEnv.hh" #include "XrdCl/XrdClConstants.hh" +#include "XrdCl/XrdClUtils.hh" #include "XrdZip/XrdZipZIP64EOCDL.hh" #include @@ -626,10 +627,18 @@ namespace XrdCl } auto wrtbuff = std::make_shared( GetCD() ); - chunks.emplace_back( cdoff, wrtbuff->size(), wrtbuff->data() ); + Pipeline p = XrdCl::Write( archive, cdoff, + wrtbuff->size(), + wrtbuff->data() ); wrtbufs.emplace_back( std::move( wrtbuff ) ); - Pipeline p = XrdCl::VectorWrite( archive, chunks ); + std::vector listsvec; + XrdCl::Utils::SplitChunks( listsvec, chunks, 262144, 1024 ); + + for(auto itr = listsvec.rbegin(); itr != listsvec.rend(); ++itr) + { + p = XrdCl::VectorWrite( archive, *itr ) | p; + } if( ckpinit ) p |= XrdCl::Checkpoint( archive, ChkPtCode::COMMIT ); p |= Close( archive ) >>