Skip to content

Commit

Permalink
[XrdCl] Make sure parallel pipeline is not blocking the thread-pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Oct 2, 2018
1 parent 996bca9 commit 269af97
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 49 deletions.
4 changes: 2 additions & 2 deletions src/XrdCl/XrdClOperationHandlers.hh
Expand Up @@ -749,7 +749,7 @@ namespace XrdCl
//! @arg Response : response type
//----------------------------------------------------------------------------
template<typename Response>
struct Resp: public RespBase<Response>
struct Resp: RespBase<Response>
{
//------------------------------------------------------------------------
//! A factory method
Expand Down Expand Up @@ -787,7 +787,7 @@ namespace XrdCl
//! @arg Response : response type
//----------------------------------------------------------------------------
template<>
struct Resp<void>: public RespBase<void>
struct Resp<void>: RespBase<void>
{
//------------------------------------------------------------------------
//! A factory method
Expand Down
9 changes: 6 additions & 3 deletions src/XrdCl/XrdClOperations.cc
Expand Up @@ -72,11 +72,12 @@ namespace XrdCl

if( !st.IsOK() || !nextOperation )
{
if( final ) final( st );
prms.set_value( st );
return;
}

nextOperation->Run( std::move( prms ), args );
nextOperation->Run( std::move( prms ), std::move( final ), args );
}

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -108,9 +109,11 @@ namespace XrdCl
//----------------------------------------------------------------------------
// OperationHandler::AssignToWorkflow
//----------------------------------------------------------------------------
void PipelineHandler::Assign( std::promise<XRootDStatus> p )
void PipelineHandler::Assign( std::promise<XRootDStatus> p,
std::function<void(const XRootDStatus&)> f )
{
prms = std::move( p );
prms = std::move( p );
final = std::move( f );
}

}
Expand Down
51 changes: 26 additions & 25 deletions src/XrdCl/XrdClOperations.hh
Expand Up @@ -94,10 +94,12 @@ namespace XrdCl
//! Set workflow to this and all next handlers. In the last handler
//! it is used to finish workflow execution
//!
//! @param wf : workflow to set
//! @throws logic_error : if a workflow has been already assigned
//! @param prms : a promis that the pipeline will have a result
//! @param final : a callable that should be called at the end of
//! pipeline
//------------------------------------------------------------------------
void Assign( std::promise<XRootDStatus> prms );
void Assign( std::promise<XRootDStatus> prms,
std::function<void(const XRootDStatus&)> final );

private:

Expand Down Expand Up @@ -127,6 +129,12 @@ namespace XrdCl
//------------------------------------------------------------------------
std::promise<XRootDStatus> prms;

//------------------------------------------------------------------------
//! The lambda/function/functor that should be called at the end of the
//! pipeline (traveling along the pipeline)
//------------------------------------------------------------------------
std::function<void(const XRootDStatus&)> final;

//------------------------------------------------------------------------
//! Arguments for forwarding
//------------------------------------------------------------------------
Expand Down Expand Up @@ -203,34 +211,26 @@ namespace XrdCl
//------------------------------------------------------------------------
virtual std::string ToString() = 0;

//------------------------------------------------------------------------
//! Conversion to boolean
//!
//! @return : true if it is a valid operation, false otherwise
//------------------------------------------------------------------------
operator bool() const
{
return valid;
}

protected:

//------------------------------------------------------------------------
//! Run operation
//!
//! @param prom : the promise that we will have a result
//! @param final : the object to call at the end of pipeline
//! @param args : forwarded arguments
//! @param bucket : number of the bucket with arguments
//!
//! @return : stOK if operation was scheduled for execution
//! successfully, stError otherwise
//------------------------------------------------------------------------
void Run( std::promise<XRootDStatus> prms,
const std::shared_ptr<ArgsContainer> &args,
int bucket = 1 )
void Run( std::promise<XRootDStatus> prms,
std::function<void(const XRootDStatus&)> final,
const std::shared_ptr<ArgsContainer> &args,
int bucket = 1 )
{
static_assert(state == Handled, "Only Operation<Handled> can be assigned to workflow");
handler->Assign( std::move( prms ) );
handler->Assign( std::move( prms ), std::move( final ) );
XRootDStatus st = RunImpl( args, bucket );
if( st.IsOK() ) handler.release();
else
Expand Down Expand Up @@ -370,9 +370,12 @@ namespace XrdCl

//------------------------------------------------------------------------
//! Conversion to Operation<Handled>
//!
//! @throws : std::logic_error if pipeline is invalid
//------------------------------------------------------------------------
operator Operation<Handled>&()
{
if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
return *operation.get();
}

Expand All @@ -383,8 +386,7 @@ namespace XrdCl
//------------------------------------------------------------------------
operator bool()
{
if( !operation ) return false;
return bool( *operation );
return bool( operation );
}

private:
Expand All @@ -405,20 +407,19 @@ namespace XrdCl
//!
//! @param args : forwarded arguments
//! @param bucket : number of bucket with forwarded params
//! @param final : to be called at the end of the pipeline
//------------------------------------------------------------------------
void Run( std::shared_ptr<ArgsContainer> args, int bucket )
void Run( std::shared_ptr<ArgsContainer> args, int bucket,
std::function<void(const XRootDStatus&)> final = nullptr )
{
if( !bool( *operation ) )
throw std::invalid_argument( "Cannot run invalid Pipeline!" );

if( ftr.valid() )
throw std::logic_error( "Pipeline is already running" );
throw std::logic_error( "Pipeline is already running" ); // TODO vs Parallel

// a promise that the pipe will have a result
std::promise<XRootDStatus> prms;
ftr = prms.get_future();
if( !args ) args = std::make_shared<ArgsContainer>();
operation->Run( std::move( prms ), args, bucket );
operation->Run( std::move( prms ), std::move( final ), args, bucket );
}

//------------------------------------------------------------------------
Expand Down
71 changes: 58 additions & 13 deletions src/XrdCl/XrdClParallelOperation.hh
Expand Up @@ -29,6 +29,8 @@
#include "XrdCl/XrdClOperations.hh"
#include "XrdCl/XrdClOperationHandlers.hh"

#include <atomic>

namespace XrdCl
{

Expand Down Expand Up @@ -101,6 +103,51 @@ namespace XrdCl

private:

//------------------------------------------------------------------------
//! Helper class for handling the PipelineHandler of the
//! ParallelOperation (RAII).
//!
//! Guarantees that the handler will be executed exactly once.
//------------------------------------------------------------------------
struct Ctx
{
//----------------------------------------------------------------------
//! Constructor.
//!
//! @param handler : the PipelineHandler of the Parallel operation
//----------------------------------------------------------------------
Ctx( PipelineHandler *handler ): handler( handler )
{

}

//----------------------------------------------------------------------
//! Destructor.
//----------------------------------------------------------------------
~Ctx()
{
Handle( XRootDStatus() );
}

//----------------------------------------------------------------------
//! Forwards the status to the PipelineHandler if the handler haven't
//! been called yet.
//!
//! @param st : status
//----------------------------------------------------------------------
void Handle( const XRootDStatus &st )
{
PipelineHandler* hdlr = handler.exchange( nullptr );
if( hdlr )
hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
}

//----------------------------------------------------------------------
//! PipelineHandler of the ParallelOperation
//----------------------------------------------------------------------
std::atomic<PipelineHandler*> handler;
};

//------------------------------------------------------------------------
//! Run operation
//!
Expand All @@ -109,25 +156,23 @@ namespace XrdCl
//! @return : status of the operation
//------------------------------------------------------------------------
XRootDStatus RunImpl( const std::shared_ptr<ArgsContainer> &params,
int bucketDefault = 0 ) // TODO that's wrong ! we sync in the handler !
int bucketDefault = 0 )
{
for( int i = 0; i < pipelines.size(); i++ )
{
int bucket = i + 1;
pipelines[i].Run( params, bucket );
}
std::shared_ptr<Ctx> ctx( new Ctx( this->handler.release() ) );

for( auto &pipe : pipelines )
try
{
XRootDStatus st = pipe.ftr.get();
if( !st.IsOK() )
for( size_t i = 0; i < pipelines.size(); ++i )
{
this->handler->HandleResponse( new XRootDStatus( st ), nullptr );
return st;
pipelines[i].Run( params, i + 1,
[ctx]( const XRootDStatus &st ){ if( !st.IsOK() ) ctx->Handle( st ); } );
}
}
catch( std::logic_error &ex )
{
ctx->Handle( XRootDStatus( stError, errInvalidOp ) );
}

this->handler->HandleResponse( new XRootDStatus(), nullptr );
return XRootDStatus();
}

Expand Down Expand Up @@ -197,7 +242,7 @@ namespace XrdCl
//----------------------------------------------------------------------------
//! Factory function for creating parallel operation from
//! a given number of operations
//! (we use && reference since due to reference colapsing this will fit
//! (we use && reference since due to reference collapsing this will fit
//! both r- and l-value references)
//----------------------------------------------------------------------------
template<typename ... Operations>
Expand Down
11 changes: 5 additions & 6 deletions tests/XrdClTests/OperationsWorkflowTest.cc
Expand Up @@ -561,12 +561,11 @@ void WorkflowTest::ParallelTest(){
TestingForwardingHandler first;
TestingForwardingHandler second;

auto &&creatingPipe = Parallel(
Open(f)(lockUrl, createFlags) >> &first | Close(f)() >> &second,
Open(dataF)(secondFileUrl, createFlags) | Close(dataF)()
) >> &parallelOperationHandler;

CPPUNIT_ASSERT_XRDST( WaitFor( creatingPipe ) );
std::vector<Pipeline> pipes; pipes.reserve( 2 );
pipes.emplace_back( Open(f)(lockUrl, createFlags) >> &first | Close(f)() >> second );
pipes.emplace_back( Open(dataF)(secondFileUrl, createFlags) | Close(dataF)() );
CPPUNIT_ASSERT_XRDST( WaitFor( Parallel( pipes ) >> parallelOperationHandler ) );
CPPUNIT_ASSERT( pipes.empty() );

delete f;
delete dataF;
Expand Down

0 comments on commit 269af97

Please sign in to comment.