From 08f47d87a720dffa9c1a625ba5f8893c1b4a7569 Mon Sep 17 00:00:00 2001 From: Michal Simon Date: Wed, 19 Dec 2018 14:55:33 +0100 Subject: [PATCH] [XrdCl] fail all handlers in pipeline on failure. --- src/XrdCl/XrdClFileOperations.hh | 7 +- src/XrdCl/XrdClOperationHandlers.hh | 138 ++++++++++++--------- src/XrdCl/XrdClOperations.cc | 22 +--- src/XrdCl/XrdClOperations.hh | 28 ++--- src/XrdCl/XrdClXRootDMsgHandler.cc | 1 + tests/XrdClTests/OperationsWorkflowTest.cc | 9 +- 6 files changed, 97 insertions(+), 108 deletions(-) diff --git a/src/XrdCl/XrdClFileOperations.hh b/src/XrdCl/XrdClFileOperations.hh index ecc5f571cc7..6f0c10e3688 100644 --- a/src/XrdCl/XrdClFileOperations.hh +++ b/src/XrdCl/XrdClFileOperations.hh @@ -196,13 +196,8 @@ namespace XrdCl template OpenImpl operator>>( Hdlr &&hdlr ) { - // check if the resulting handler should be owned by us or by the user, - // if the user passed us directly a ResponseHandler it's owned by the - // user, otherwise we need to wrap the argument in a handler and in this - // case the resulting handler will be owned by us - constexpr bool own = !IsResponseHandler::value; ExResp factory( *this->file ); - return this->StreamImpl( factory.Create( hdlr ), own ); + return this->StreamImpl( factory.Create( hdlr ) ); } //------------------------------------------------------------------------ diff --git a/src/XrdCl/XrdClOperationHandlers.hh b/src/XrdCl/XrdClOperationHandlers.hh index 06b659c8637..6c9e79727e7 100644 --- a/src/XrdCl/XrdClOperationHandlers.hh +++ b/src/XrdCl/XrdClOperationHandlers.hh @@ -30,41 +30,10 @@ #include #include +#include namespace XrdCl { - //---------------------------------------------------------------------------- - //! Helper class for checking if a given Handler is derived - //! from ForwardingHandler - //! - //! @arg Hdlr : type of given handler - //---------------------------------------------------------------------------- - template - struct IsResponseHandler - { - //------------------------------------------------------------------------ - //! true if the Hdlr type has been derived from ForwardingHandler, - //! false otherwise - //------------------------------------------------------------------------ - static constexpr bool value = std::is_base_of::value; - }; - - //---------------------------------------------------------------------------- - //! Helper class for checking if a given Handler is derived - //! from ForwardingHandler (overloaded for pointers) - //! - //! @arg Hdlr : type of given handler - //---------------------------------------------------------------------------- - template - struct IsResponseHandler - { - //------------------------------------------------------------------------ - //! true if the Hdlr type has been derived from ForwardingHandler, - //! false otherwise - //------------------------------------------------------------------------ - static constexpr bool value = std::is_base_of::value; - }; - //---------------------------------------------------------------------------- //! Helper class for unpacking single XAttrStatus from bulk response //---------------------------------------------------------------------------- @@ -118,6 +87,7 @@ namespace XrdCl response->Get( bulk ); *status = bulk->front().status; std::string *rsp = new std::string( std::move( bulk->front().value ) ); + delete bulk; response->Set( rsp ); handler->HandleResponse( status, response ); delete this; @@ -450,22 +420,11 @@ namespace XrdCl //! //! @param ftr : the future to be linked with this handler //------------------------------------------------------------------------ - FutureWrapperBase( std::future &ftr ) : called( false ) + FutureWrapperBase( std::future &ftr ) { ftr = prms.get_future(); } - //------------------------------------------------------------------------ - //! Destructor - //! - //! If the handler was not called sets an exception in the promise - //------------------------------------------------------------------------ - ~FutureWrapperBase() - { - if( !called ) - this->SetException( XRootDStatus( stError, errPipelineFailed ) ); - } - protected: //------------------------------------------------------------------------ @@ -483,12 +442,6 @@ namespace XrdCl //! promise that corresponds to the future //------------------------------------------------------------------------ std::promise prms; - - //------------------------------------------------------------------------ - //! true if the handler has been called, false otherwise - //------------------------------------------------------------------------ - bool called; - }; //---------------------------------------------------------------------------- @@ -516,8 +469,6 @@ namespace XrdCl //------------------------------------------------------------------------ void HandleResponse( XRootDStatus *status, AnyObject *response ) { - this->called = true; - if( status->IsOK() ) { Response *resp = GetResponse( response ); @@ -558,9 +509,6 @@ namespace XrdCl //------------------------------------------------------------------------ void HandleResponse( XRootDStatus *status, AnyObject *response ) { - this->called = true; - - if( status->IsOK() ) { prms.set_value(); @@ -575,6 +523,72 @@ namespace XrdCl }; + //---------------------------------------------------------------------------- + //! Wrapper class for Pipeline ResponseHandlers. + //! + //! Makes sure that in case the pipeline has failed before the actual handler + //! could be called + //---------------------------------------------------------------------------- + class FinalizeHandler : public ResponseHandler + { + public: + + //------------------------------------------------------------------------ + //! Constructor + //! + //! @param handler : the actual operation handler + //------------------------------------------------------------------------ + FinalizeHandler( ResponseHandler *handler ) : handler( handler ) + { + + } + + //------------------------------------------------------------------------ + //! Destructor + //------------------------------------------------------------------------ + virtual ~FinalizeHandler() + { + ResponseHandler* hdlr = handler.exchange( nullptr ); + if( hdlr ) + hdlr->HandleResponseWithHosts( new XRootDStatus( stError, errPipelineFailed), nullptr, nullptr ); + } + + //------------------------------------------------------------------------ + //! Callback method (@see ResponseHandler) + //! + //! Note: does not delete itself because it is assumed that it is owned + //! by the PipelineHandler (@see PipelineHandler) + //------------------------------------------------------------------------ + virtual void HandleResponseWithHosts( XRootDStatus *status, + AnyObject *response, + HostList *hostList ) + { + ResponseHandler* hdlr = handler.exchange( nullptr ); + if( hdlr ) + hdlr->HandleResponseWithHosts( status, response, hostList ); + } + + private: + + //------------------------------------------------------------------------ + //! The actual operation handler + //------------------------------------------------------------------------ + std::atomic handler; + }; + + //---------------------------------------------------------------------------- + //! Utility function for wrapping a ResponseHandler into a FinalizeHandler + //! + //! @param handler : the actual handler + //! + //! @return : a FinalizeHandler + //---------------------------------------------------------------------------- + inline FinalizeHandler* make_finalized( ResponseHandler *handler ) + { + return new FinalizeHandler( handler ); + } + + //---------------------------------------------------------------------------- //! A base class for factories, creates ForwardingHandlers from //! ResponseHandler*, ResponseHandler& and std::future @@ -592,7 +606,7 @@ namespace XrdCl //------------------------------------------------------------------------ inline static ResponseHandler* Create( ResponseHandler *hdlr ) { - return hdlr; + return make_finalized( hdlr ); } //------------------------------------------------------------------------ @@ -603,7 +617,7 @@ namespace XrdCl //------------------------------------------------------------------------ inline static ResponseHandler* Create( ResponseHandler &hdlr ) { - return &hdlr; + return make_finalized( &hdlr ); } //------------------------------------------------------------------------ @@ -614,7 +628,7 @@ namespace XrdCl //------------------------------------------------------------------------ inline static ResponseHandler* Create( std::future &ftr ) { - return new FutureWrapper( ftr ); + return make_finalized( new FutureWrapper( ftr ) ); } }; @@ -636,7 +650,7 @@ namespace XrdCl inline static ResponseHandler* Create( std::function func ) { - return new FunctionWrapper( func ); + return make_finalized( new FunctionWrapper( func ) ); } //------------------------------------------------------------------------ @@ -649,7 +663,7 @@ namespace XrdCl inline static ResponseHandler* Create( std::packaged_task &task ) { - return new TaskWrapper( std::move( task ) ); + return make_finalized( new TaskWrapper( std::move( task ) ) ); } //------------------------------------------------------------------------ @@ -674,7 +688,7 @@ namespace XrdCl //------------------------------------------------------------------------ inline static ResponseHandler* Create( std::function func ) { - return new FunctionWrapper( func ); + return make_finalized( new FunctionWrapper( func ) ); } //------------------------------------------------------------------------ @@ -686,7 +700,7 @@ namespace XrdCl template inline static ResponseHandler* Create( std::packaged_task &task ) { - return new TaskWrapper( std::move( task ) ); + return make_finalized( new TaskWrapper( std::move( task ) ) ); } //------------------------------------------------------------------------ diff --git a/src/XrdCl/XrdClOperations.cc b/src/XrdCl/XrdClOperations.cc index da090d4e81f..58b64ca72fd 100644 --- a/src/XrdCl/XrdClOperations.cc +++ b/src/XrdCl/XrdClOperations.cc @@ -36,15 +36,8 @@ namespace XrdCl //---------------------------------------------------------------------------- // OperationHandler Constructor. //---------------------------------------------------------------------------- - PipelineHandler::PipelineHandler( ResponseHandler *handler, bool own ) : - responseHandler( handler ), ownHandler( own ) - { - } - - //---------------------------------------------------------------------------- - // Default Constructor. - //---------------------------------------------------------------------------- - PipelineHandler::PipelineHandler() : responseHandler( nullptr ), ownHandler( false ) + PipelineHandler::PipelineHandler( ResponseHandler *handler ) : + responseHandler( handler ) { } @@ -74,10 +67,7 @@ namespace XrdCl // We need to copy status as original status object is destroyed in HandleResponse function XRootDStatus st( *status ); if( responseHandler ) - { responseHandler->HandleResponseWithHosts( status, response, hostList ); - ownHandler = false; - } else dealloc( status, response, hostList ); @@ -109,14 +99,6 @@ namespace XrdCl HandleResponseImpl( status, response ); } - //---------------------------------------------------------------------------- - // OperationHandler Destructor - //---------------------------------------------------------------------------- - PipelineHandler::~PipelineHandler() - { - if( ownHandler ) delete responseHandler; - } - //---------------------------------------------------------------------------- // OperationHandler::AssignToWorkflow //---------------------------------------------------------------------------- diff --git a/src/XrdCl/XrdClOperations.hh b/src/XrdCl/XrdClOperations.hh index aa741a6de7d..30b041eddc8 100644 --- a/src/XrdCl/XrdClOperations.hh +++ b/src/XrdCl/XrdClOperations.hh @@ -60,12 +60,14 @@ namespace XrdCl //! @param own : if true we have the ownership of handler (it's //! memory), and it is our responsibility to deallocate it //------------------------------------------------------------------------ - PipelineHandler( ResponseHandler *handler, bool own ); + PipelineHandler( ResponseHandler *handler ); //------------------------------------------------------------------------ //! Default Constructor. //------------------------------------------------------------------------ - PipelineHandler(); + PipelineHandler() + { + } //------------------------------------------------------------------------ //! Callback function. @@ -81,7 +83,9 @@ namespace XrdCl //------------------------------------------------------------------------ //! Destructor. //------------------------------------------------------------------------ - ~PipelineHandler(); + ~PipelineHandler() + { + } //------------------------------------------------------------------------ //! Add new operation to the pipeline @@ -120,12 +124,7 @@ namespace XrdCl //------------------------------------------------------------------------ //! The handler of our operation //------------------------------------------------------------------------ - ResponseHandler *responseHandler; - - //------------------------------------------------------------------------ - //! true, if we own the handler - //------------------------------------------------------------------------ - bool ownHandler; + std::unique_ptr responseHandler; //------------------------------------------------------------------------ //! Next operation in the pipeline @@ -505,12 +504,7 @@ namespace XrdCl template Derived operator>>( Hdlr &&hdlr ) { - // check if the resulting handler should be owned by us or by the user, - // if the user passed us directly a ResponseHandler it's owned by the - // user, otherwise we need to wrap the argument in a handler and in this - // case the resulting handler will be owned by us - constexpr bool own = !IsResponseHandler::value; - return this->StreamImpl( HdlrFactory::Create( hdlr ), own ); + return this->StreamImpl( HdlrFactory::Create( hdlr ) ); } //------------------------------------------------------------------------ @@ -605,10 +599,10 @@ namespace XrdCl //! @ //! @return : return an instance of Derived //------------------------------------------------------------------------ - inline Derived StreamImpl( ResponseHandler *handler, bool own ) + inline Derived StreamImpl( ResponseHandler *handler ) { static_assert( !HasHndl, "Operator >> is available only for operation without handler" ); - this->handler.reset( new PipelineHandler( handler, own ) ); + this->handler.reset( new PipelineHandler( handler ) ); return Transform(); } diff --git a/src/XrdCl/XrdClXRootDMsgHandler.cc b/src/XrdCl/XrdClXRootDMsgHandler.cc index c2e01502174..a3e3b49d9e1 100644 --- a/src/XrdCl/XrdClXRootDMsgHandler.cc +++ b/src/XrdCl/XrdClXRootDMsgHandler.cc @@ -1820,6 +1820,7 @@ namespace XrdCl return status; std::vector resp; + resp.reserve( nattr ); // read the name vec for( kXR_char i = 0; i < nattr; ++i ) diff --git a/tests/XrdClTests/OperationsWorkflowTest.cc b/tests/XrdClTests/OperationsWorkflowTest.cc index 18777ec5da6..ba45e3e13a8 100644 --- a/tests/XrdClTests/OperationsWorkflowTest.cc +++ b/tests/XrdClTests/OperationsWorkflowTest.cc @@ -322,14 +322,17 @@ void WorkflowTest::MissingParameterTest(){ // Create and execute workflow //---------------------------------------------------------------------------- - bool closed = false; + bool pipebroken = false; const OpenFlags::Flags flags = OpenFlags::Read; uint64_t offset = 0; Pipeline pipe = Open( f, fileUrl, flags ) | Stat( f, true ) | Read( f, offset, size, buffer ) >> readHandler // by reference - | Close( f ) >> [&]( XRootDStatus& st ){ closed = true; }; + | Close( f ) >> [&]( XRootDStatus& st ) + { + pipebroken = !st.IsOK() && ( st.code == errPipelineFailed ); + }; XRootDStatus status = WaitFor( std::move( pipe ) ); CPPUNIT_ASSERT( status.IsError() ); @@ -337,7 +340,7 @@ void WorkflowTest::MissingParameterTest(){ // If there is an error, last handlers should not be executed //---------------------------------------------------------------------------- CPPUNIT_ASSERT( readHandler.Executed() ); - CPPUNIT_ASSERT( !closed ); + CPPUNIT_ASSERT( pipebroken ); }