Skip to content

Commit

Permalink
[XrdCl] Simplify kXR_attn handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed May 4, 2022
1 parent fb26c7f commit 85cbeaf
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 227 deletions.
83 changes: 78 additions & 5 deletions src/XrdCl/XrdClAsyncMsgReader.hh
Expand Up @@ -106,11 +106,22 @@ namespace XrdCl
case ReadHeader:
{
XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;

if( !st.IsOK() || st.code == suRetry )
return st;

log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
strmname.c_str(), inmsg.get(), inmsg->GetCursor() );

ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
if( rsp->hdr.status == kXR_attn )
{
log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
"of message 0x%x", strmname.c_str(), inmsg.get() );
inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
readstage = ReadAttn;
continue;
}

inmsgsize = inmsg->GetCursor();
inhandler = strm.InstallIncHandler( inmsg, substrmnb );

Expand All @@ -131,16 +142,44 @@ namespace XrdCl
continue;
}
//------------------------------------------------------------------
// Before proceeding we need to figure out the attn action code
//------------------------------------------------------------------
case ReadAttn:
{
XRootDStatus st = ReadAttnActnum();
if( !st.IsOK() || st.code == suRetry )
return st;

//----------------------------------------------------------------
// There is an embedded response, overwrite the message with that
//----------------------------------------------------------------
if( HasEmbeddedRsp() )
{
inmsg->Free();
readstage = ReadHeader;
continue;
}

//----------------------------------------------------------------
// Readout the rest of the body
//----------------------------------------------------------------
inmsgsize = inmsg->GetCursor();
readstage = ReadMsgBody;
continue;
}
//------------------------------------------------------------------
// We need to call a raw message handler to get the data from the
// socket
//------------------------------------------------------------------
case ReadRawData:
{
uint32_t bytesRead = 0;
XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
if( !st.IsOK() ) return st;
if( !st.IsOK() )
return st;
inmsgsize += bytesRead;
if( st.code == suRetry ) return st;
if( st.code == suRetry )
return st;
//----------------------------------------------------------------
// The next step is to finalize the read
//----------------------------------------------------------------
Expand All @@ -153,7 +192,8 @@ namespace XrdCl
case ReadMsgBody:
{
XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;
if( !st.IsOK() || st.code == suRetry )
return st;
inmsgsize = inmsg->GetCursor();

//----------------------------------------------------------------
Expand Down Expand Up @@ -216,13 +256,46 @@ namespace XrdCl

private:

XRootDStatus ReadAttnActnum()
{
//----------------------------------------------------------------------
// Readout the action code from the socket. We are reading out 8 bytes
// into the message, the 8 byte header is already there.
//----------------------------------------------------------------------
size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
while( btsleft > 0 )
{
int btsrd = 0;
XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
if( !st.IsOK() || st.code == suRetry )
return st;
btsleft -= btsrd;
inmsg->AdvanceCursor( btsrd );
}

//----------------------------------------------------------------------
// Marshal the action code
//----------------------------------------------------------------------
ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
attn->actnum = ntohl( attn->actnum );

return XRootDStatus();
}

inline bool HasEmbeddedRsp()
{
ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
return ( attn->actnum == kXR_asynresp );
}

//------------------------------------------------------------------------
//! Stages of reading out a response from the socket
//------------------------------------------------------------------------
enum Stage
{
ReadStart, //< the next step is to initialize the read
ReadHeader, //< the next step is to read the header
ReadAttn, //< the next step is to read attn action code
ReadMsgBody, //< the next step is to read the body
ReadRawData, //< the next step is to read the raw data
ReadDone //< the next step is to finalize the read
Expand Down
57 changes: 3 additions & 54 deletions src/XrdCl/XrdClInQueue.cc
Expand Up @@ -35,67 +35,16 @@ namespace XrdCl

ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();

// We got an async message
// We only care about async responses, but those are extracted now
// in the SocketHandler
if( rsp->hdr.status == kXR_attn )
{
if( msg.GetSize() < 12 )
return true;

// We only care about async responses
if( rsp->body.attn.actnum != (int32_t)htonl(kXR_asynresp) )
return true;

if( msg.GetSize() < 24 )
return true;

ServerResponse *embRsp = (ServerResponse*)msg.GetBuffer(16);
sid = ((uint16_t)embRsp->hdr.streamid[1] << 8) | (uint16_t)embRsp->hdr.streamid[0];
}
return true;
else
{
sid = ((uint16_t)rsp->hdr.streamid[1] << 8) | (uint16_t)rsp->hdr.streamid[0];
}

return false;
}

//----------------------------------------------------------------------------
// Add a message to the queue
//----------------------------------------------------------------------------
bool InQueue::AddMessage( std::shared_ptr<Message> msg )
{
uint16_t action = 0;
MsgHandler* handler = 0;
uint16_t msgSid = 0;

if (DiscardMessage(*msg, msgSid))
{
return true;
}

// Lookup the sid in the map of handlers
pMutex.Lock();
HandlerMap::iterator it = pHandlers.find(msgSid);

if (it != pHandlers.end())
{
handler = it->second.first;
action = handler->Examine( msg );

if( action & MsgHandler::RemoveHandler )
pHandlers.erase( it );
}
else
pMessages[msgSid] = msg;

pMutex.UnLock();

if( handler && !(action & MsgHandler::NoProcess) )
handler->Process();

return true;
}

//----------------------------------------------------------------------------
// Add a listener that should be notified about incoming messages
//----------------------------------------------------------------------------
Expand Down
5 changes: 0 additions & 5 deletions src/XrdCl/XrdClInQueue.hh
Expand Up @@ -36,11 +36,6 @@ namespace XrdCl
class InQueue
{
public:
//------------------------------------------------------------------------
//! Add a fully reconstructed message to the queue
//------------------------------------------------------------------------
bool AddMessage( std::shared_ptr<Message> msg );

//------------------------------------------------------------------------
//! Add a listener that should be notified about incoming messages
//!
Expand Down
28 changes: 7 additions & 21 deletions src/XrdCl/XrdClStream.cc
Expand Up @@ -471,30 +471,16 @@ namespace XrdCl
InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;

//--------------------------------------------------------------------------
// Check if we can obtain a handler now that we have the whole response
// No handler, we discard the message ...
//--------------------------------------------------------------------------
if( !mh.handler )
{
//------------------------------------------------------------------------
// Try once more if we can obtain a handler for the message, for the async
// (kXR_attn) messages this can be only done once the whole message has
// been read out from the socket
//------------------------------------------------------------------------
mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
mh.expires,
mh.action );
//------------------------------------------------------------------------
// No handler, we discard the message ...
//------------------------------------------------------------------------
if( !mh.handler )
{
ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
log->Warning( PostMasterMsg, "[%s] Discarding received message: 0x%x "
"(status=%d, SID=[%d,%d]), no MsgHandler found.",
pStreamName.c_str(), msg.get(), rsp->hdr.status,
rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
return;
}
ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
log->Warning( PostMasterMsg, "[%s] Discarding received message: 0x%x "
"(status=%d, SID=[%d,%d]), no MsgHandler found.",
pStreamName.c_str(), msg.get(), rsp->hdr.status,
rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
return;
}

//--------------------------------------------------------------------------
Expand Down
17 changes: 0 additions & 17 deletions src/XrdCl/XrdClStream.hh
Expand Up @@ -296,23 +296,6 @@ namespace XrdCl
return false;
}

//------------------------------------------------------------------------
// Job queuing the incoming messages
//------------------------------------------------------------------------
class QueueIncMsgJob: public Job
{
public:
QueueIncMsgJob( InQueue &queue, std::shared_ptr<Message> msg ): pQueue( queue ), msg( std::move( msg ) ) {};
virtual ~QueueIncMsgJob() {};
virtual void Run( void* )
{
pQueue.AddMessage( std::move( msg ) );
}
private:
InQueue &pQueue;
std::shared_ptr<Message> msg;
};

//------------------------------------------------------------------------
// Job handling the incoming messages
//------------------------------------------------------------------------
Expand Down

0 comments on commit 85cbeaf

Please sign in to comment.