Skip to content

Commit

Permalink
[XrdCl] fail all handlers in pipeline on failure.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Dec 19, 2018
1 parent 879caa6 commit 2ac3b93
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 108 deletions.
7 changes: 1 addition & 6 deletions src/XrdCl/XrdClFileOperations.hh
Expand Up @@ -196,13 +196,8 @@ namespace XrdCl
template<typename Hdlr>
OpenImpl<true> operator>>( Hdlr &&hdlr )
{
// check if the resulting handler should be owned by us or by the user,
// if the user passed us directly a ResponseHandler it's owned by the
// user, otherwise we need to wrap the argument in a handler and in this
// case the resulting handler will be owned by us
constexpr bool own = !IsResponseHandler<Hdlr>::value;
ExResp factory( *this->file );
return this->StreamImpl( factory.Create( hdlr ), own );
return this->StreamImpl( factory.Create( hdlr ) );
}

//------------------------------------------------------------------------
Expand Down
138 changes: 76 additions & 62 deletions src/XrdCl/XrdClOperationHandlers.hh
Expand Up @@ -30,41 +30,10 @@

#include<functional>
#include<future>
#include <atomic>

namespace XrdCl
{
//----------------------------------------------------------------------------
//! Helper class for checking if a given Handler is derived
//! from ForwardingHandler
//!
//! @arg Hdlr : type of given handler
//----------------------------------------------------------------------------
template<typename Hdlr>
struct IsResponseHandler
{
//------------------------------------------------------------------------
//! true if the Hdlr type has been derived from ForwardingHandler,
//! false otherwise
//------------------------------------------------------------------------
static constexpr bool value = std::is_base_of<XrdCl::ResponseHandler, Hdlr>::value;
};

//----------------------------------------------------------------------------
//! Helper class for checking if a given Handler is derived
//! from ForwardingHandler (overloaded for pointers)
//!
//! @arg Hdlr : type of given handler
//----------------------------------------------------------------------------
template<typename Hdlr>
struct IsResponseHandler<Hdlr*>
{
//------------------------------------------------------------------------
//! true if the Hdlr type has been derived from ForwardingHandler,
//! false otherwise
//------------------------------------------------------------------------
static constexpr bool value = std::is_base_of<XrdCl::ResponseHandler, Hdlr>::value;
};

//----------------------------------------------------------------------------
//! Helper class for unpacking single XAttrStatus from bulk response
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -118,6 +87,7 @@ namespace XrdCl
response->Get( bulk );
*status = bulk->front().status;
std::string *rsp = new std::string( std::move( bulk->front().value ) );
delete bulk;
response->Set( rsp );
handler->HandleResponse( status, response );
delete this;
Expand Down Expand Up @@ -450,22 +420,11 @@ namespace XrdCl
//!
//! @param ftr : the future to be linked with this handler
//------------------------------------------------------------------------
FutureWrapperBase( std::future<Response> &ftr ) : called( false )
FutureWrapperBase( std::future<Response> &ftr )
{
ftr = prms.get_future();
}

//------------------------------------------------------------------------
//! Destructor
//!
//! If the handler was not called sets an exception in the promise
//------------------------------------------------------------------------
~FutureWrapperBase()
{
if( !called )
this->SetException( XRootDStatus( stError, errPipelineFailed ) );
}

protected:

//------------------------------------------------------------------------
Expand All @@ -483,12 +442,6 @@ namespace XrdCl
//! promise that corresponds to the future
//------------------------------------------------------------------------
std::promise<Response> prms;

//------------------------------------------------------------------------
//! true if the handler has been called, false otherwise
//------------------------------------------------------------------------
bool called;

};

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -516,8 +469,6 @@ namespace XrdCl
//------------------------------------------------------------------------
void HandleResponse( XRootDStatus *status, AnyObject *response )
{
this->called = true;

if( status->IsOK() )
{
Response *resp = GetResponse<Response>( response );
Expand Down Expand Up @@ -558,9 +509,6 @@ namespace XrdCl
//------------------------------------------------------------------------
void HandleResponse( XRootDStatus *status, AnyObject *response )
{
this->called = true;


if( status->IsOK() )
{
prms.set_value();
Expand All @@ -575,6 +523,72 @@ namespace XrdCl
};


//----------------------------------------------------------------------------
//! Wrapper class for Pipeline ResponseHandlers.
//!
//! Makes sure that in case the pipeline has failed before the actual handler
//! could be called
//----------------------------------------------------------------------------
class FinalizeHandler : public ResponseHandler
{
public:

//------------------------------------------------------------------------
//! Constructor
//!
//! @param handler : the actual operation handler
//------------------------------------------------------------------------
FinalizeHandler( ResponseHandler *handler ) : handler( handler )
{

}

//------------------------------------------------------------------------
//! Destructor
//------------------------------------------------------------------------
virtual ~FinalizeHandler()
{
ResponseHandler* hdlr = handler.exchange( nullptr );
if( hdlr )
hdlr->HandleResponseWithHosts( new XRootDStatus( stError, errPipelineFailed), nullptr, nullptr );
}

//------------------------------------------------------------------------
//! Callback method (@see ResponseHandler)
//!
//! Note: does not delete itself because it is assumed that it is owned
//! by the PipelineHandler (@see PipelineHandler)
//------------------------------------------------------------------------
virtual void HandleResponseWithHosts( XRootDStatus *status,
AnyObject *response,
HostList *hostList )
{
ResponseHandler* hdlr = handler.exchange( nullptr );
if( hdlr )
hdlr->HandleResponseWithHosts( status, response, hostList );
}

private:

//------------------------------------------------------------------------
//! The actual operation handler
//------------------------------------------------------------------------
std::atomic<ResponseHandler*> handler;
};

//----------------------------------------------------------------------------
//! Utility function for wrapping a ResponseHandler into a FinalizeHandler
//!
//! @param handler : the actual handler
//!
//! @return : a FinalizeHandler
//----------------------------------------------------------------------------
inline FinalizeHandler* make_finalized( ResponseHandler *handler )
{
return new FinalizeHandler( handler );
}


//----------------------------------------------------------------------------
//! A base class for factories, creates ForwardingHandlers from
//! ResponseHandler*, ResponseHandler& and std::future<Response>
Expand All @@ -592,7 +606,7 @@ namespace XrdCl
//------------------------------------------------------------------------
inline static ResponseHandler* Create( ResponseHandler *hdlr )
{
return hdlr;
return make_finalized( hdlr );
}

//------------------------------------------------------------------------
Expand All @@ -603,7 +617,7 @@ namespace XrdCl
//------------------------------------------------------------------------
inline static ResponseHandler* Create( ResponseHandler &hdlr )
{
return &hdlr;
return make_finalized( &hdlr );
}

//------------------------------------------------------------------------
Expand All @@ -614,7 +628,7 @@ namespace XrdCl
//------------------------------------------------------------------------
inline static ResponseHandler* Create( std::future<Response> &ftr )
{
return new FutureWrapper<Response>( ftr );
return make_finalized( new FutureWrapper<Response>( ftr ) );
}
};

Expand All @@ -636,7 +650,7 @@ namespace XrdCl
inline static ResponseHandler* Create( std::function<void( XRootDStatus&,
Response& )> func )
{
return new FunctionWrapper<Response>( func );
return make_finalized( new FunctionWrapper<Response>( func ) );
}

//------------------------------------------------------------------------
Expand All @@ -649,7 +663,7 @@ namespace XrdCl
inline static ResponseHandler* Create( std::packaged_task<Return( XRootDStatus&,
Response& )> &task )
{
return new TaskWrapper<Response, Return>( std::move( task ) );
return make_finalized( new TaskWrapper<Response, Return>( std::move( task ) ) );
}

//------------------------------------------------------------------------
Expand All @@ -674,7 +688,7 @@ namespace XrdCl
//------------------------------------------------------------------------
inline static ResponseHandler* Create( std::function<void( XRootDStatus& )> func )
{
return new FunctionWrapper<void>( func );
return make_finalized( new FunctionWrapper<void>( func ) );
}

//------------------------------------------------------------------------
Expand All @@ -686,7 +700,7 @@ namespace XrdCl
template<typename Return>
inline static ResponseHandler* Create( std::packaged_task<Return( XRootDStatus& )> &task )
{
return new TaskWrapper<void, Return>( std::move( task ) );
return make_finalized( new TaskWrapper<void, Return>( std::move( task ) ) );
}

//------------------------------------------------------------------------
Expand Down
22 changes: 2 additions & 20 deletions src/XrdCl/XrdClOperations.cc
Expand Up @@ -36,15 +36,8 @@ namespace XrdCl
//----------------------------------------------------------------------------
// OperationHandler Constructor.
//----------------------------------------------------------------------------
PipelineHandler::PipelineHandler( ResponseHandler *handler, bool own ) :
responseHandler( handler ), ownHandler( own )
{
}

//----------------------------------------------------------------------------
// Default Constructor.
//----------------------------------------------------------------------------
PipelineHandler::PipelineHandler() : responseHandler( nullptr ), ownHandler( false )
PipelineHandler::PipelineHandler( ResponseHandler *handler ) :
responseHandler( handler )
{
}

Expand Down Expand Up @@ -74,10 +67,7 @@ namespace XrdCl
// We need to copy status as original status object is destroyed in HandleResponse function
XRootDStatus st( *status );
if( responseHandler )
{
responseHandler->HandleResponseWithHosts( status, response, hostList );
ownHandler = false;
}
else
dealloc( status, response, hostList );

Expand Down Expand Up @@ -109,14 +99,6 @@ namespace XrdCl
HandleResponseImpl( status, response );
}

//----------------------------------------------------------------------------
// OperationHandler Destructor
//----------------------------------------------------------------------------
PipelineHandler::~PipelineHandler()
{
if( ownHandler ) delete responseHandler;
}

//----------------------------------------------------------------------------
// OperationHandler::AssignToWorkflow
//----------------------------------------------------------------------------
Expand Down
28 changes: 11 additions & 17 deletions src/XrdCl/XrdClOperations.hh
Expand Up @@ -60,12 +60,14 @@ namespace XrdCl
//! @param own : if true we have the ownership of handler (it's
//! memory), and it is our responsibility to deallocate it
//------------------------------------------------------------------------
PipelineHandler( ResponseHandler *handler, bool own );
PipelineHandler( ResponseHandler *handler );

//------------------------------------------------------------------------
//! Default Constructor.
//------------------------------------------------------------------------
PipelineHandler();
PipelineHandler()
{
}

//------------------------------------------------------------------------
//! Callback function.
Expand All @@ -81,7 +83,9 @@ namespace XrdCl
//------------------------------------------------------------------------
//! Destructor.
//------------------------------------------------------------------------
~PipelineHandler();
~PipelineHandler()
{
}

//------------------------------------------------------------------------
//! Add new operation to the pipeline
Expand Down Expand Up @@ -120,12 +124,7 @@ namespace XrdCl
//------------------------------------------------------------------------
//! The handler of our operation
//------------------------------------------------------------------------
ResponseHandler *responseHandler;

//------------------------------------------------------------------------
//! true, if we own the handler
//------------------------------------------------------------------------
bool ownHandler;
std::unique_ptr<ResponseHandler> responseHandler;

//------------------------------------------------------------------------
//! Next operation in the pipeline
Expand Down Expand Up @@ -505,12 +504,7 @@ namespace XrdCl
template<typename Hdlr>
Derived<true> operator>>( Hdlr &&hdlr )
{
// check if the resulting handler should be owned by us or by the user,
// if the user passed us directly a ResponseHandler it's owned by the
// user, otherwise we need to wrap the argument in a handler and in this
// case the resulting handler will be owned by us
constexpr bool own = !IsResponseHandler<Hdlr>::value;
return this->StreamImpl( HdlrFactory::Create( hdlr ), own );
return this->StreamImpl( HdlrFactory::Create( hdlr ) );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -605,10 +599,10 @@ namespace XrdCl
//! @
//! @return : return an instance of Derived<true>
//------------------------------------------------------------------------
inline Derived<true> StreamImpl( ResponseHandler *handler, bool own )
inline Derived<true> StreamImpl( ResponseHandler *handler )
{
static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
this->handler.reset( new PipelineHandler( handler, own ) );
this->handler.reset( new PipelineHandler( handler ) );
return Transform<true>();
}

Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClXRootDMsgHandler.cc
Expand Up @@ -1704,6 +1704,7 @@ namespace XrdCl
return status;

std::vector<XAttr> resp;
resp.reserve( nattr );

// read the name vec
for( kXR_char i = 0; i < nattr; ++i )
Expand Down

0 comments on commit 2ac3b93

Please sign in to comment.