Skip to content

Commit

Permalink
[XrdCl] SocketHandler: encapsulate the request write operation in a s…
Browse files Browse the repository at this point in the history
…eparate class.
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent 6837632 commit a1e8796
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 184 deletions.
22 changes: 14 additions & 8 deletions src/XrdCl/XrdClAsyncHSReader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadStart:
{
incmsg.reset( new Message() );
inmsg.reset( new Message() );
//----------------------------------------------------------------
// The next step is to read the header
//----------------------------------------------------------------
Expand All @@ -87,11 +87,11 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadHeader:
{
XRootDStatus st = xrdTransport.GetHeader( *incmsg, &socket );
XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;
log->Dump( AsyncSockMsg,
"[%s] Received message header, size: %d",
strmname.c_str(), incmsg->GetCursor() );
strmname.c_str(), inmsg->GetCursor() );
//----------------------------------------------------------------
// The next step is to read the message body
//----------------------------------------------------------------
Expand All @@ -103,17 +103,23 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadMsgBody:
{
XRootDStatus st = xrdTransport.GetBody( *incmsg, &socket );
XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;
log->Dump( AsyncSockMsg, "[%s] Received a message of %d bytes",
strmname.c_str(), incmsg->GetSize() );
strmname.c_str(), inmsg->GetSize() );
readstage = ReadDone;
return st;
}

case ReadDone: return XRootDStatus();
}
// just in case ...
break;
}
//----------------------------------------------------------------------
// We are done
//----------------------------------------------------------------------
return XRootDStatus();
}

//------------------------------------------------------------------------
Expand All @@ -122,7 +128,7 @@ namespace XrdCl
std::unique_ptr<Message> ReleaseMsg()
{
readstage = ReadStart;
return std::move( incmsg );
return std::move( inmsg );
}

//------------------------------------------------------------------------
Expand All @@ -131,7 +137,7 @@ namespace XrdCl
inline void Reset()
{
readstage = ReadStart;
incmsg.reset();
inmsg.reset();
}

private:
Expand Down Expand Up @@ -164,7 +170,7 @@ namespace XrdCl
//------------------------------------------------------------------------
// The internal state of the the reader
//------------------------------------------------------------------------
std::unique_ptr<Message> incmsg;
std::unique_ptr<Message> inmsg;
};
}

Expand Down
48 changes: 24 additions & 24 deletions src/XrdCl/XrdClAsyncMsgReader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ namespace XrdCl
strmname( strmname ),
strm( strm ),
substrmnb( substrmnb ),
incmsgsize( 0 ),
inchandler( nullptr )
inmsgsize( 0 ),
inhandler( nullptr )
{
}

Expand All @@ -71,9 +71,9 @@ namespace XrdCl
inline void Reset()
{
readstage = ReadStart;
incmsg.reset();
incmsgsize = 0;
inchandler = nullptr;
inmsg.reset();
inmsgsize = 0;
inhandler = nullptr;
}

//------------------------------------------------------------------------
Expand All @@ -93,7 +93,7 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadStart:
{
incmsg = std::make_shared<Message>();
inmsg = std::make_shared<Message>();
//----------------------------------------------------------------
// The next step is to read the header
//----------------------------------------------------------------
Expand All @@ -105,19 +105,19 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadHeader:
{
XRootDStatus st = xrdTransport.GetHeader( *incmsg, &socket );
XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;


log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
strmname.c_str(), incmsg.get(), incmsg->GetCursor() );
incmsgsize = incmsg->GetCursor();
inchandler = strm.InstallIncHandler( incmsg, substrmnb );
strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
inmsgsize = inmsg->GetCursor();
inhandler = strm.InstallIncHandler( inmsg, substrmnb );

if( inchandler )
if( inhandler )
{
log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
"of message 0x%x", strmname.c_str(), incmsg.get() );
"of message 0x%x", strmname.c_str(), inmsg.get() );
//--------------------------------------------------------------
// The next step is to read raw data
//--------------------------------------------------------------
Expand All @@ -137,9 +137,9 @@ namespace XrdCl
case ReadRawData:
{
uint32_t bytesRead = 0;
XRootDStatus st = inchandler->ReadMessageBody( incmsg.get(), &socket, bytesRead );
XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
if( !st.IsOK() ) return st;
incmsgsize += bytesRead;
inmsgsize += bytesRead;
if( st.code == suRetry ) return st;
//----------------------------------------------------------------
// The next step is to finalize the read
Expand All @@ -152,17 +152,17 @@ namespace XrdCl
//------------------------------------------------------------------
case ReadMsgBody:
{
XRootDStatus st = xrdTransport.GetBody( *incmsg, &socket );
XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;
incmsgsize = incmsg->GetCursor();
inmsgsize = inmsg->GetCursor();

//----------------------------------------------------------------
// Now check if there are some additional raw data to be read
//----------------------------------------------------------------
if( !inchandler )
if( !inhandler )
{
uint16_t action = strm.InspectStatusRsp( substrmnb,
inchandler );
inhandler );

if( action & IncomingMsgHandler::Corrupted )
return XRootDStatus( stError, errCorruptedHeader );
Expand Down Expand Up @@ -199,12 +199,12 @@ namespace XrdCl
// Report the incoming message
//----------------------------------------------------------------
log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
strmname.c_str(), incmsg.get(), incmsgsize );
strmname.c_str(), inmsg.get(), inmsgsize );

strm.OnIncoming( substrmnb, std::move( incmsg ), incmsgsize );
strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
}
}

// just in case
break;
}

Expand Down Expand Up @@ -246,9 +246,9 @@ namespace XrdCl
//------------------------------------------------------------------------
// The internal state of the the reader
//------------------------------------------------------------------------
std::shared_ptr<Message> incmsg; //< the ownership is shared with MsgHandler
uint32_t incmsgsize;
IncomingMsgHandler *inchandler;
std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
uint32_t inmsgsize;
IncomingMsgHandler *inhandler;

};

Expand Down

0 comments on commit a1e8796

Please sign in to comment.