Skip to content

Commit

Permalink
[XrdCl] Further simplify and consolidate response processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed May 17, 2022
1 parent 153d3c5 commit d7002b8
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 110 deletions.
32 changes: 11 additions & 21 deletions src/XrdCl/XrdClInQueue.cc
Expand Up @@ -68,6 +68,10 @@ namespace XrdCl
//----------------------------------------------------------------------------
// Get a message handler interested in receiving message whose header
// is stored in msg
//
// If the handler is found it is always removed from the queue for the time
// of processing the response. In case of a t/o, stream error, if the response
// has been chunked the handler should be readded using `ReAddMessageHandler`.
//----------------------------------------------------------------------------
MsgHandler *InQueue::GetHandlerForMessage( std::shared_ptr<Message> &msg,
time_t &expires,
Expand All @@ -91,9 +95,7 @@ namespace XrdCl
handler = it->second.first;
act = handler->Examine( msg );
exp = it->second.second;

if( act & MsgHandler::RemoveHandler )
pHandlers.erase( it );
pHandlers.erase( it );
}

if( handler )
Expand All @@ -109,36 +111,25 @@ namespace XrdCl
// Re-insert the handler without scanning the cached messages
//----------------------------------------------------------------------------
void InQueue::ReAddMessageHandler( MsgHandler *handler,
time_t expires )
time_t expires )
{
uint16_t handlerSid = handler->GetSid();
XrdSysMutexHelper scopedLock( pMutex );
pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
}

//----------------------------------------------------------------------------
// Remove a listener
//----------------------------------------------------------------------------
void InQueue::RemoveMessageHandler( MsgHandler *handler )
{
uint16_t handlerSid = handler->GetSid();
XrdSysMutexHelper scopedLock( pMutex );
pHandlers.erase(handlerSid);
}

//----------------------------------------------------------------------------
// Report an event to the handlers
//----------------------------------------------------------------------------
void InQueue::ReportStreamEvent( MsgHandler::StreamEvent event,
XRootDStatus status )
XRootDStatus status )
{
uint8_t action = 0;
XrdSysMutexHelper scopedLock( pMutex );
for( HandlerMap::iterator it = pHandlers.begin(); it != pHandlers.end(); )
{
action = it->second.first->OnStreamEvent( event, status );

if( action & MsgHandler::RemoveHandler )
it->second.first->OnStreamEvent( event, status );
if( event != MsgHandler::Ready )
{
auto next = it; ++next;
pHandlers.erase( it );
Expand All @@ -163,11 +154,10 @@ namespace XrdCl
{
if( it->second.second <= now )
{
uint8_t act = it->second.first->OnStreamEvent( MsgHandler::Timeout,
it->second.first->OnStreamEvent( MsgHandler::Timeout,
Status( stError, errOperationExpired ) );
auto next = it; ++next;
if( act & MsgHandler::RemoveHandler )
pHandlers.erase( it );
pHandlers.erase( it );
it = next;
}
else
Expand Down
6 changes: 1 addition & 5 deletions src/XrdCl/XrdClInQueue.hh
Expand Up @@ -36,6 +36,7 @@ namespace XrdCl
class InQueue
{
public:

//------------------------------------------------------------------------
//! Add a listener that should be notified about incoming messages
//!
Expand Down Expand Up @@ -65,11 +66,6 @@ namespace XrdCl
//------------------------------------------------------------------------
void ReAddMessageHandler( MsgHandler *handler, time_t expires );

//------------------------------------------------------------------------
//! Remove a listener
//------------------------------------------------------------------------
void RemoveMessageHandler( MsgHandler *handler );

//------------------------------------------------------------------------
//! Report an event to the handlers
//------------------------------------------------------------------------
Expand Down
6 changes: 2 additions & 4 deletions src/XrdCl/XrdClPostMasterInterfaces.hh
Expand Up @@ -148,13 +148,11 @@ namespace XrdCl
//!
//! @param event type of the event
//! @param status status info
//! @return Action::RemoveHandler or 0
//------------------------------------------------------------------------
virtual uint8_t OnStreamEvent( StreamEvent event,
XRootDStatus status )
virtual void OnStreamEvent( StreamEvent event,
XRootDStatus status )
{
(void)event; (void)status;
return 0;
};

//------------------------------------------------------------------------
Expand Down
29 changes: 0 additions & 29 deletions src/XrdCl/XrdClStream.cc
Expand Up @@ -427,25 +427,6 @@ namespace XrdCl
return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
}

//------------------------------------------------------------------------
// Check if message is a partial response
//------------------------------------------------------------------------
bool Stream::IsPartial( Message &msg )
{
ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
if( rsphdr->status == kXR_oksofar )
return true;

if( rsphdr->status == kXR_status )
{
ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
if( rspst->bdy.resptype == XrdProto::kXR_PartialResult )
return true;
}

return false;
}

//----------------------------------------------------------------------------
// Call back when a message has been reconstructed
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -497,13 +478,6 @@ namespace XrdCl
log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: 0x%x.",
pStreamName.c_str(), msg->GetDescription().c_str() );

// if we are handling partial response we have to take down the timeout fence
if( IsPartial( *msg ) )
{
XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( mh.handler );
if( xrdHandler ) xrdHandler->PartialReceived();
}

mh.Reset();
return;
}
Expand Down Expand Up @@ -1117,9 +1091,6 @@ namespace XrdCl
uint16_t action = mh.handler->InspectStatusRsp();
mh.action |= action;

if( action & MsgHandler::RemoveHandler )
pIncomingQueue->RemoveMessageHandler( mh.handler );

if( action & MsgHandler::Raw )
{
incHandler = mh.handler;
Expand Down
5 changes: 0 additions & 5 deletions src/XrdCl/XrdClStream.hh
Expand Up @@ -277,11 +277,6 @@ namespace XrdCl

private:

//------------------------------------------------------------------------
//! Check if message is a partial response
//------------------------------------------------------------------------
static bool IsPartial( Message &msg );

//------------------------------------------------------------------------
//! Check if addresses contains given address
//------------------------------------------------------------------------
Expand Down
37 changes: 9 additions & 28 deletions src/XrdCl/XrdClXRootDMsgHandler.cc
Expand Up @@ -169,6 +169,12 @@ namespace XrdCl
pResponse = msg;
pBodyReader->SetDataLength( dlen );

//--------------------------------------------------------------------------
// We received a new message, in case this is kXR_status we will need to
// unmarshal the body.
//--------------------------------------------------------------------------
pRspStatusBodyUnMarshaled = false;

Log *log = DefaultEnv::GetLog();
switch( status )
{
Expand Down Expand Up @@ -229,9 +235,7 @@ namespace XrdCl
pRequest->GetDescription().c_str() );

if( !pOksofarAsAnswer )
{
pPartialResps.emplace_back( std::move( pResponse ) );
}

//----------------------------------------------------------------------
// For kXR_read we either read in raw mode if the message has not
Expand All @@ -240,19 +244,13 @@ namespace XrdCl
//----------------------------------------------------------------------
uint16_t reqId = ntohs( req->header.requestid );
if( reqId == kXR_read )
{
pTimeoutFence.store( true, std::memory_order_relaxed );
return Raw | ( pOksofarAsAnswer ? None : NoProcess );
}

//----------------------------------------------------------------------
// kXR_readv is similar to read, except that the payload is different
//----------------------------------------------------------------------
if( reqId == kXR_readv )
{
pTimeoutFence.store( true, std::memory_order_relaxed );
return Raw | ( pOksofarAsAnswer ? None : NoProcess );
}

return ( pOksofarAsAnswer ? None : NoProcess );
}
Expand Down Expand Up @@ -353,10 +351,7 @@ namespace XrdCl
action |= Raw;

if( rspst->bdy.resptype == XrdProto::kXR_PartialResult )
{
action |= NoProcess;
pTimeoutFence.store( true, std::memory_order_relaxed );
}
else
action |= RemoveHandler;
}
Expand Down Expand Up @@ -868,18 +863,15 @@ namespace XrdCl
//----------------------------------------------------------------------------
// Handle an event other that a message arrival - may be timeout
//----------------------------------------------------------------------------
uint8_t XRootDMsgHandler::OnStreamEvent( StreamEvent event,
XRootDStatus status )
void XRootDMsgHandler::OnStreamEvent( StreamEvent event,
XRootDStatus status )
{
Log *log = DefaultEnv::GetLog();
log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );

if( event == Ready )
return 0;

if( pTimeoutFence.load( std::memory_order_relaxed ) )
return 0;
return;

if( pSidMgr && pMsgInFly && ( event == Timeout
|| status.code == errOperationExpired
Expand All @@ -890,7 +882,6 @@ namespace XrdCl
}

HandleError( status );
return RemoveHandler;
}

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1120,15 +1111,6 @@ namespace XrdCl
HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
}

//----------------------------------------------------------------------------
// Bookkeeping after partial response has been received.
//----------------------------------------------------------------------------
void XRootDMsgHandler::PartialReceived()
{
pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
pRspStatusBodyUnMarshaled = false;
}

//----------------------------------------------------------------------------
// Unpack the message and call the response handler
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1206,7 +1188,6 @@ namespace XrdCl
{
XrdSysCondVarHelper lck( pCV );
pResponse.reset();
pTimeoutFence.store( false, std::memory_order_relaxed );
pCV.Broadcast();
}
}
Expand Down
20 changes: 2 additions & 18 deletions src/XrdCl/XrdClXRootDMsgHandler.hh
Expand Up @@ -168,8 +168,6 @@ namespace XrdCl

pMsgInFly( false ),

pTimeoutFence( false ),

pDirListStarted( false ),
pDirListWithStat( false ),

Expand Down Expand Up @@ -285,8 +283,8 @@ namespace XrdCl
//! @param streamNum stream concerned
//! @param status status info
//------------------------------------------------------------------------
virtual uint8_t OnStreamEvent( StreamEvent event,
XRootDStatus status );
virtual void OnStreamEvent( StreamEvent event,
XRootDStatus status );

//------------------------------------------------------------------------
//! The requested action has been performed and the status is available
Expand Down Expand Up @@ -425,13 +423,6 @@ namespace XrdCl
pStateful = stateful;
}

//------------------------------------------------------------------------
//! Bookkeeping after partial response has been received:
//! - take down the timeout fence after oksofar response has been handled
//! - reset status-response-body marshaled flag
//------------------------------------------------------------------------
void PartialReceived();

private:

//------------------------------------------------------------------------
Expand Down Expand Up @@ -644,13 +635,6 @@ namespace XrdCl

bool pMsgInFly;

//------------------------------------------------------------------------
// true if MsgHandler is both in inQueue and installed in respective
// Stream (this could happen if server gave oksofar response), otherwise
// false
//------------------------------------------------------------------------
std::atomic<bool> pTimeoutFence;

//------------------------------------------------------------------------
// if we are serving chunked data to the user's handler in case of
// kXR_dirlist we need to memorize if the response contains stat info or
Expand Down

0 comments on commit d7002b8

Please sign in to comment.