Skip to content

Commit

Permalink
[XrdCl] Make the response ownership more clear.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent cf02d8f commit cce3271
Show file tree
Hide file tree
Showing 17 changed files with 206 additions and 277 deletions.
36 changes: 16 additions & 20 deletions src/XrdCl/XrdClAsyncMsgReader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ namespace XrdCl
strm( strm ),
substrmnb( substrmnb ),
incmsgsize( 0 ),
inchandler( std::make_pair( nullptr, false ) )
inchandler( nullptr )
{
}

Expand All @@ -71,12 +71,9 @@ namespace XrdCl
inline void Reset()
{
readstage = ReadStart;
if( inchandler.second ) // TODO
incmsg.reset();
else
incmsg.release();
incmsg.reset();
incmsgsize = 0;
inchandler = std::make_pair( nullptr, false );
inchandler = nullptr;
}

//------------------------------------------------------------------------
Expand All @@ -96,7 +93,7 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadStart:
{
incmsg.reset( new Message() );
incmsg = std::make_shared<Message>();
//----------------------------------------------------------------
// The next step is to read the header
//----------------------------------------------------------------
Expand All @@ -108,16 +105,16 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadHeader:
{
XRootDStatus st = xrdTransport.GetHeader( incmsg.get(), &socket );
XRootDStatus st = xrdTransport.GetHeader( *incmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;


log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
strmname.c_str(), incmsg.get(), incmsg->GetCursor() );
incmsgsize = incmsg->GetCursor();
inchandler = strm.InstallIncHandler( incmsg.get(), substrmnb );
inchandler = strm.InstallIncHandler( incmsg, substrmnb );

if( inchandler.first )
if( inchandler )
{
log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
"of message 0x%x", strmname.c_str(), incmsg.get() );
Expand All @@ -140,7 +137,7 @@ namespace XrdCl
case ReadRawData:
{
uint32_t bytesRead = 0;
XRootDStatus st = inchandler.first->ReadMessageBody( incmsg.get(), &socket, bytesRead );
XRootDStatus st = inchandler->ReadMessageBody( incmsg.get(), &socket, bytesRead );
if( !st.IsOK() ) return st;
incmsgsize += bytesRead;
if( st.code == suRetry ) return st;
Expand All @@ -155,24 +152,23 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadMsgBody:
{
XRootDStatus st = xrdTransport.GetBody( incmsg.get(), &socket );
XRootDStatus st = xrdTransport.GetBody( *incmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;
incmsgsize = incmsg->GetCursor();

//----------------------------------------------------------------
// Now check if there are some additional raw data to be read
//----------------------------------------------------------------
if( !inchandler.first )
if( !inchandler )
{
uint16_t action = strm.InspectStatusRsp( incmsg.get(), substrmnb,
inchandler.first );
uint16_t action = strm.InspectStatusRsp( *incmsg, substrmnb,
inchandler );

if( action & IncomingMsgHandler::Corrupted )
return XRootDStatus( stError, errCorruptedHeader );

if( action & IncomingMsgHandler::Raw )
{
inchandler.second = true;
//------------------------------------------------------------
// The next step is to read the raw data
//------------------------------------------------------------
Expand Down Expand Up @@ -205,7 +201,7 @@ namespace XrdCl
log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
strmname.c_str(), incmsg.get(), incmsgsize );

strm.OnIncoming( substrmnb, incmsg.release(), incmsgsize );
strm.OnIncoming( substrmnb, std::move( incmsg ), incmsgsize );
}
}

Expand Down Expand Up @@ -250,9 +246,9 @@ namespace XrdCl
//------------------------------------------------------------------------
// The internal state of the the reader
//------------------------------------------------------------------------
std::unique_ptr<Message> incmsg;
uint32_t incmsgsize;
std::pair<IncomingMsgHandler*, bool> inchandler; //< true means the handler owns the server response
std::shared_ptr<Message> incmsg; //< the ownership is shared with MsgHandler
uint32_t incmsgsize;
IncomingMsgHandler *inchandler;

};

Expand Down
4 changes: 2 additions & 2 deletions src/XrdCl/XrdClAsyncSocketHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ namespace XrdCl
Log *log = DefaultEnv::GetLog();
if( !pHeaderDone )
{
st = pTransport->GetHeader( toRead, pSocket );
st = pTransport->GetHeader( *toRead, pSocket );
if( st.IsOK() && st.code == suDone )
{
log->Dump( AsyncSockMsg,
Expand All @@ -762,7 +762,7 @@ namespace XrdCl
return st;
}

st = pTransport->GetBody( toRead, pSocket );
st = pTransport->GetBody( *toRead, pSocket );
if( st.IsOK() && st.code == suDone )
{
log->Dump( AsyncSockMsg, "[%s] Received a message of %d bytes",
Expand Down
20 changes: 11 additions & 9 deletions src/XrdCl/XrdClChannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,26 @@ namespace
//------------------------------------------------------------------------
// Message handler
//------------------------------------------------------------------------
virtual uint16_t Examine( XrdCl::Message *msg )
virtual uint16_t Examine( std::shared_ptr<XrdCl::Message> &msg )
{
if( pFilter->Filter( msg ) )
{
pMsg = msg;
return Take | RemoveHandler;
}
return Ignore;
}

//------------------------------------------------------------------------
//! Reexamine the incoming message, and decide on the action to be taken
//------------------------------------------------------------------------
virtual uint16_t InspectStatusRsp( XrdCl::Message *msg )
virtual uint16_t InspectStatusRsp( XrdCl::Message &msg )
{
return 0;
}

virtual void Process( XrdCl::Message *msg )
virtual void Process()
{
pMsg = msg;
pSem->Post();
}

Expand Down Expand Up @@ -110,7 +112,7 @@ namespace
//------------------------------------------------------------------------
XrdCl::Message *GetMessage()
{
return pMsg;
return pMsg.get();
}

//------------------------------------------------------------------------
Expand All @@ -128,10 +130,10 @@ namespace
FilterHandler(const FilterHandler &other);
FilterHandler &operator = (const FilterHandler &other);

XrdSysSemaphore *pSem;
XrdCl::MessageFilter *pFilter;
XrdCl::Message *pMsg;
XrdCl::XRootDStatus pStatus;
XrdSysSemaphore *pSem;
XrdCl::MessageFilter *pFilter;
std::shared_ptr<XrdCl::Message> pMsg;
XrdCl::XRootDStatus pStatus;
};

//----------------------------------------------------------------------------
Expand Down
34 changes: 17 additions & 17 deletions src/XrdCl/XrdClInQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,27 @@ namespace XrdCl
//----------------------------------------------------------------------------
// Filter messages
//----------------------------------------------------------------------------
bool InQueue::DiscardMessage(Message* msg, uint16_t& sid) const
bool InQueue::DiscardMessage( Message& msg, uint16_t& sid) const
{
if( msg->GetSize() < 8 )
if( msg.GetSize() < 8 )
return true;

ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();

// We got an async message
if( rsp->hdr.status == kXR_attn )
{
if( msg->GetSize() < 12 )
return true;
if( msg.GetSize() < 12 )
return true;

// We only care about async responses
if( rsp->body.attn.actnum != (int32_t)htonl(kXR_asynresp) )
return true;
return true;

if( msg->GetSize() < 24 )
return true;
if( msg.GetSize() < 24 )
return true;

ServerResponse *embRsp = (ServerResponse*)msg->GetBuffer(16);
ServerResponse *embRsp = (ServerResponse*)msg.GetBuffer(16);
sid = ((uint16_t)embRsp->hdr.streamid[1] << 8) | (uint16_t)embRsp->hdr.streamid[0];
}
else
Expand All @@ -62,13 +62,13 @@ namespace XrdCl
//----------------------------------------------------------------------------
// Add a message to the queue
//----------------------------------------------------------------------------
bool InQueue::AddMessage( Message *msg )
bool InQueue::AddMessage( std::shared_ptr<Message> msg )
{
uint16_t action = 0;
IncomingMsgHandler* handler = 0;
uint16_t msgSid = 0;

if (DiscardMessage(msg, msgSid))
if (DiscardMessage(*msg, msgSid))
{
return true;
}
Expand All @@ -92,7 +92,7 @@ namespace XrdCl
pMutex.UnLock();

if( handler && !(action & IncomingMsgHandler::NoProcess) )
handler->Process( msg );
handler->Process();

return true;
}
Expand All @@ -114,7 +114,7 @@ namespace XrdCl
if( action & IncomingMsgHandler::Take )
{
if( !(action & IncomingMsgHandler::NoProcess ) )
handler->Process( it->second );
handler->Process();

pMessages.erase( it );
}
Expand All @@ -128,16 +128,16 @@ namespace XrdCl
// Get a message handler interested in receiving message whose header
// is stored in msg
//----------------------------------------------------------------------------
IncomingMsgHandler *InQueue::GetHandlerForMessage( Message *msg,
time_t &expires,
uint16_t &action )
IncomingMsgHandler *InQueue::GetHandlerForMessage( std::shared_ptr<Message> &msg,
time_t &expires,
uint16_t &action )
{
time_t exp = 0;
uint16_t act = 0;
uint16_t msgSid = 0;
IncomingMsgHandler* handler = 0;

if (DiscardMessage(msg, msgSid))
if (DiscardMessage(*msg, msgSid))
{
return handler;
}
Expand Down
13 changes: 7 additions & 6 deletions src/XrdCl/XrdClInQueue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <XrdSys/XrdSysPthread.hh>
#include <map>
#include <memory>
#include <utility>
#include "XrdCl/XrdClXRootDResponses.hh"
#include "XrdCl/XrdClPostMasterInterfaces.hh"
Expand All @@ -38,7 +39,7 @@ namespace XrdCl
//------------------------------------------------------------------------
//! Add a fully reconstructed message to the queue
//------------------------------------------------------------------------
bool AddMessage( Message *msg );
bool AddMessage( std::shared_ptr<Message> msg );

//------------------------------------------------------------------------
//! Add a listener that should be notified about incoming messages
Expand All @@ -58,9 +59,9 @@ namespace XrdCl
//!
//! @return handler or 0 if none is interested
//------------------------------------------------------------------------
IncomingMsgHandler *GetHandlerForMessage( Message *msg,
time_t &expires,
uint16_t &action );
IncomingMsgHandler *GetHandlerForMessage( std::shared_ptr<Message> &msg,
time_t &expires,
uint16_t &action );

//------------------------------------------------------------------------
//! Re-insert the handler without scanning the cached messages
Expand Down Expand Up @@ -95,11 +96,11 @@ namespace XrdCl
//!
//! @return true if message discarded, otherwise false
//------------------------------------------------------------------------
bool DiscardMessage(Message* msg, uint16_t& sid) const;
bool DiscardMessage(Message& msg, uint16_t& sid) const;

typedef std::pair<IncomingMsgHandler *, time_t> HandlerAndExpire;
typedef std::map<uint16_t, HandlerAndExpire> HandlerMap;
typedef std::map<uint16_t, Message*> MessageMap;
typedef std::map<uint16_t, std::shared_ptr<Message>> MessageMap;
MessageMap pMessages;
HandlerMap pHandlers;
XrdSysRecMutex pMutex;
Expand Down

0 comments on commit cce3271

Please sign in to comment.