From 269af97e0fbb02a439f79f5c76b87d13988aaabe Mon Sep 17 00:00:00 2001 From: Michal Simon Date: Tue, 2 Oct 2018 10:45:40 +0200 Subject: [PATCH] [XrdCl] Make sure parallel pipeline is not blocking the thread-pool. --- src/XrdCl/XrdClOperationHandlers.hh | 4 +- src/XrdCl/XrdClOperations.cc | 9 ++- src/XrdCl/XrdClOperations.hh | 51 ++++++++-------- src/XrdCl/XrdClParallelOperation.hh | 71 ++++++++++++++++++---- tests/XrdClTests/OperationsWorkflowTest.cc | 11 ++-- 5 files changed, 97 insertions(+), 49 deletions(-) diff --git a/src/XrdCl/XrdClOperationHandlers.hh b/src/XrdCl/XrdClOperationHandlers.hh index 41087d18cf2..b8559e352a2 100644 --- a/src/XrdCl/XrdClOperationHandlers.hh +++ b/src/XrdCl/XrdClOperationHandlers.hh @@ -749,7 +749,7 @@ namespace XrdCl //! @arg Response : response type //---------------------------------------------------------------------------- template - struct Resp: public RespBase + struct Resp: RespBase { //------------------------------------------------------------------------ //! A factory method @@ -787,7 +787,7 @@ namespace XrdCl //! @arg Response : response type //---------------------------------------------------------------------------- template<> - struct Resp: public RespBase + struct Resp: RespBase { //------------------------------------------------------------------------ //! A factory method diff --git a/src/XrdCl/XrdClOperations.cc b/src/XrdCl/XrdClOperations.cc index 9406f57fb44..590f1941a23 100644 --- a/src/XrdCl/XrdClOperations.cc +++ b/src/XrdCl/XrdClOperations.cc @@ -72,11 +72,12 @@ namespace XrdCl if( !st.IsOK() || !nextOperation ) { + if( final ) final( st ); prms.set_value( st ); return; } - nextOperation->Run( std::move( prms ), args ); + nextOperation->Run( std::move( prms ), std::move( final ), args ); } //---------------------------------------------------------------------------- @@ -108,9 +109,11 @@ namespace XrdCl //---------------------------------------------------------------------------- // OperationHandler::AssignToWorkflow //---------------------------------------------------------------------------- - void PipelineHandler::Assign( std::promise p ) + void PipelineHandler::Assign( std::promise p, + std::function f ) { - prms = std::move( p ); + prms = std::move( p ); + final = std::move( f ); } } diff --git a/src/XrdCl/XrdClOperations.hh b/src/XrdCl/XrdClOperations.hh index c33adfd9e88..2aa30d38264 100644 --- a/src/XrdCl/XrdClOperations.hh +++ b/src/XrdCl/XrdClOperations.hh @@ -94,10 +94,12 @@ namespace XrdCl //! Set workflow to this and all next handlers. In the last handler //! it is used to finish workflow execution //! - //! @param wf : workflow to set - //! @throws logic_error : if a workflow has been already assigned + //! @param prms : a promis that the pipeline will have a result + //! @param final : a callable that should be called at the end of + //! pipeline //------------------------------------------------------------------------ - void Assign( std::promise prms ); + void Assign( std::promise prms, + std::function final ); private: @@ -127,6 +129,12 @@ namespace XrdCl //------------------------------------------------------------------------ std::promise prms; + //------------------------------------------------------------------------ + //! The lambda/function/functor that should be called at the end of the + //! pipeline (traveling along the pipeline) + //------------------------------------------------------------------------ + std::function final; + //------------------------------------------------------------------------ //! Arguments for forwarding //------------------------------------------------------------------------ @@ -203,34 +211,26 @@ namespace XrdCl //------------------------------------------------------------------------ virtual std::string ToString() = 0; - //------------------------------------------------------------------------ - //! Conversion to boolean - //! - //! @return : true if it is a valid operation, false otherwise - //------------------------------------------------------------------------ - operator bool() const - { - return valid; - } - protected: //------------------------------------------------------------------------ //! Run operation //! //! @param prom : the promise that we will have a result + //! @param final : the object to call at the end of pipeline //! @param args : forwarded arguments //! @param bucket : number of the bucket with arguments //! //! @return : stOK if operation was scheduled for execution //! successfully, stError otherwise //------------------------------------------------------------------------ - void Run( std::promise prms, - const std::shared_ptr &args, - int bucket = 1 ) + void Run( std::promise prms, + std::function final, + const std::shared_ptr &args, + int bucket = 1 ) { static_assert(state == Handled, "Only Operation can be assigned to workflow"); - handler->Assign( std::move( prms ) ); + handler->Assign( std::move( prms ), std::move( final ) ); XRootDStatus st = RunImpl( args, bucket ); if( st.IsOK() ) handler.release(); else @@ -370,9 +370,12 @@ namespace XrdCl //------------------------------------------------------------------------ //! Conversion to Operation + //! + //! @throws : std::logic_error if pipeline is invalid //------------------------------------------------------------------------ operator Operation&() { + if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." ); return *operation.get(); } @@ -383,8 +386,7 @@ namespace XrdCl //------------------------------------------------------------------------ operator bool() { - if( !operation ) return false; - return bool( *operation ); + return bool( operation ); } private: @@ -405,20 +407,19 @@ namespace XrdCl //! //! @param args : forwarded arguments //! @param bucket : number of bucket with forwarded params + //! @param final : to be called at the end of the pipeline //------------------------------------------------------------------------ - void Run( std::shared_ptr args, int bucket ) + void Run( std::shared_ptr args, int bucket, + std::function final = nullptr ) { - if( !bool( *operation ) ) - throw std::invalid_argument( "Cannot run invalid Pipeline!" ); - if( ftr.valid() ) - throw std::logic_error( "Pipeline is already running" ); + throw std::logic_error( "Pipeline is already running" ); // TODO vs Parallel // a promise that the pipe will have a result std::promise prms; ftr = prms.get_future(); if( !args ) args = std::make_shared(); - operation->Run( std::move( prms ), args, bucket ); + operation->Run( std::move( prms ), std::move( final ), args, bucket ); } //------------------------------------------------------------------------ diff --git a/src/XrdCl/XrdClParallelOperation.hh b/src/XrdCl/XrdClParallelOperation.hh index a29dc7d1295..83105afeefc 100644 --- a/src/XrdCl/XrdClParallelOperation.hh +++ b/src/XrdCl/XrdClParallelOperation.hh @@ -29,6 +29,8 @@ #include "XrdCl/XrdClOperations.hh" #include "XrdCl/XrdClOperationHandlers.hh" +#include + namespace XrdCl { @@ -101,6 +103,51 @@ namespace XrdCl private: + //------------------------------------------------------------------------ + //! Helper class for handling the PipelineHandler of the + //! ParallelOperation (RAII). + //! + //! Guarantees that the handler will be executed exactly once. + //------------------------------------------------------------------------ + struct Ctx + { + //---------------------------------------------------------------------- + //! Constructor. + //! + //! @param handler : the PipelineHandler of the Parallel operation + //---------------------------------------------------------------------- + Ctx( PipelineHandler *handler ): handler( handler ) + { + + } + + //---------------------------------------------------------------------- + //! Destructor. + //---------------------------------------------------------------------- + ~Ctx() + { + Handle( XRootDStatus() ); + } + + //---------------------------------------------------------------------- + //! Forwards the status to the PipelineHandler if the handler haven't + //! been called yet. + //! + //! @param st : status + //---------------------------------------------------------------------- + void Handle( const XRootDStatus &st ) + { + PipelineHandler* hdlr = handler.exchange( nullptr ); + if( hdlr ) + hdlr->HandleResponse( new XRootDStatus( st ), nullptr ); + } + + //---------------------------------------------------------------------- + //! PipelineHandler of the ParallelOperation + //---------------------------------------------------------------------- + std::atomic handler; + }; + //------------------------------------------------------------------------ //! Run operation //! @@ -109,25 +156,23 @@ namespace XrdCl //! @return : status of the operation //------------------------------------------------------------------------ XRootDStatus RunImpl( const std::shared_ptr ¶ms, - int bucketDefault = 0 ) // TODO that's wrong ! we sync in the handler ! + int bucketDefault = 0 ) { - for( int i = 0; i < pipelines.size(); i++ ) - { - int bucket = i + 1; - pipelines[i].Run( params, bucket ); - } + std::shared_ptr ctx( new Ctx( this->handler.release() ) ); - for( auto &pipe : pipelines ) + try { - XRootDStatus st = pipe.ftr.get(); - if( !st.IsOK() ) + for( size_t i = 0; i < pipelines.size(); ++i ) { - this->handler->HandleResponse( new XRootDStatus( st ), nullptr ); - return st; + pipelines[i].Run( params, i + 1, + [ctx]( const XRootDStatus &st ){ if( !st.IsOK() ) ctx->Handle( st ); } ); } } + catch( std::logic_error &ex ) + { + ctx->Handle( XRootDStatus( stError, errInvalidOp ) ); + } - this->handler->HandleResponse( new XRootDStatus(), nullptr ); return XRootDStatus(); } @@ -197,7 +242,7 @@ namespace XrdCl //---------------------------------------------------------------------------- //! Factory function for creating parallel operation from //! a given number of operations - //! (we use && reference since due to reference colapsing this will fit + //! (we use && reference since due to reference collapsing this will fit //! both r- and l-value references) //---------------------------------------------------------------------------- template diff --git a/tests/XrdClTests/OperationsWorkflowTest.cc b/tests/XrdClTests/OperationsWorkflowTest.cc index 5a6a9c8103f..91cb4c9cc20 100644 --- a/tests/XrdClTests/OperationsWorkflowTest.cc +++ b/tests/XrdClTests/OperationsWorkflowTest.cc @@ -561,12 +561,11 @@ void WorkflowTest::ParallelTest(){ TestingForwardingHandler first; TestingForwardingHandler second; - auto &&creatingPipe = Parallel( - Open(f)(lockUrl, createFlags) >> &first | Close(f)() >> &second, - Open(dataF)(secondFileUrl, createFlags) | Close(dataF)() - ) >> ¶llelOperationHandler; - - CPPUNIT_ASSERT_XRDST( WaitFor( creatingPipe ) ); + std::vector pipes; pipes.reserve( 2 ); + pipes.emplace_back( Open(f)(lockUrl, createFlags) >> &first | Close(f)() >> second ); + pipes.emplace_back( Open(dataF)(secondFileUrl, createFlags) | Close(dataF)() ); + CPPUNIT_ASSERT_XRDST( WaitFor( Parallel( pipes ) >> parallelOperationHandler ) ); + CPPUNIT_ASSERT( pipes.empty() ); delete f; delete dataF;