Skip to content

Commit

Permalink
[XrdCl] SocketHandler: encapsulate the hand-shake read operation in a…
Browse files Browse the repository at this point in the history
… separate class.
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent aa791b7 commit 6837632
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 53 deletions.
171 changes: 171 additions & 0 deletions src/XrdCl/XrdClAsyncHSReader.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
//------------------------------------------------------------------------------
// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
// Author: Michal Simon <michal.simon@cern.ch>
//------------------------------------------------------------------------------
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//------------------------------------------------------------------------------

#ifndef SRC_XRDCL_XRDCLASYNCHSREADER_HH_
#define SRC_XRDCL_XRDCLASYNCHSREADER_HH_

#include "XrdCl/XrdClMessage.hh"
#include "XrdCl/XrdClPostMasterInterfaces.hh"
#include "XrdCl/XrdClXRootDResponses.hh"
#include "XrdCl/XrdClSocket.hh"
#include "XrdCl/XrdClConstants.hh"
#include "XrdCl/XrdClStream.hh"

#include <memory>

namespace XrdCl
{
//----------------------------------------------------------------------------
//! Utility class encapsulating reading hand-shake response logic
//----------------------------------------------------------------------------
class AsyncHSReader
{
public:
//------------------------------------------------------------------------
//! Constructor
//!
//! @param xrdTransport : the (xrootd) transport layer
//! @param socket : the socket with the message to be read out
//! @param strmname : stream name
//! @param strm : the stream encapsulating the connection
//! @param substrmnb : the substream number
//------------------------------------------------------------------------
AsyncHSReader( TransportHandler &xrdTransport,
Socket &socket,
const std::string &strmname,
Stream &strm,
uint16_t substrmnb) : readstage( ReadStart ),
xrdTransport( xrdTransport ),
socket( socket ),
strmname( strmname ),
strm( strm ),
substrmnb( substrmnb )
{
}

//------------------------------------------------------------------------
//! Read out the response from the socket
//------------------------------------------------------------------------
XRootDStatus Read()
{
Log *log = DefaultEnv::GetLog();

while( true )
{
switch( readstage )
{
//------------------------------------------------------------------
// There is no incoming message currently being processed so we
// create a new one
//------------------------------------------------------------------
case ReadStart:
{
incmsg.reset( new Message() );
//----------------------------------------------------------------
// The next step is to read the header
//----------------------------------------------------------------
readstage = ReadHeader;
continue;
}
//------------------------------------------------------------------
// We need to read the header
//------------------------------------------------------------------
case ReadHeader:
{
XRootDStatus st = xrdTransport.GetHeader( *incmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;
log->Dump( AsyncSockMsg,
"[%s] Received message header, size: %d",
strmname.c_str(), incmsg->GetCursor() );
//----------------------------------------------------------------
// The next step is to read the message body
//----------------------------------------------------------------
readstage = ReadMsgBody;
continue;
}
//------------------------------------------------------------------
// We read the message to the buffer
//------------------------------------------------------------------
case ReadMsgBody:
{
XRootDStatus st = xrdTransport.GetBody( *incmsg, &socket );
if( !st.IsOK() || st.code == suRetry ) return st;
log->Dump( AsyncSockMsg, "[%s] Received a message of %d bytes",
strmname.c_str(), incmsg->GetSize() );
readstage = ReadDone;
return st;
}

case ReadDone: return XRootDStatus();
}
}
}

//------------------------------------------------------------------------
//! Transfer the received message ownership
//------------------------------------------------------------------------
std::unique_ptr<Message> ReleaseMsg()
{
readstage = ReadStart;
return std::move( incmsg );
}

//------------------------------------------------------------------------
//! Reset the state of the object (makes it ready to read out next msg)
//------------------------------------------------------------------------
inline void Reset()
{
readstage = ReadStart;
incmsg.reset();
}

private:

//------------------------------------------------------------------------
//! Stages of reading out a response from the socket
//------------------------------------------------------------------------
enum Stage
{
ReadStart, //< the next step is to initialize the read
ReadHeader, //< the next step is to read the header
ReadMsgBody, //< the next step is to read the body
ReadDone //< we are done
};

//------------------------------------------------------------------------
// Current read stage
//------------------------------------------------------------------------
Stage readstage;

//------------------------------------------------------------------------
// The context of the read operation
//------------------------------------------------------------------------
TransportHandler &xrdTransport;
Socket &socket;
const std::string &strmname;
Stream &strm;
uint16_t substrmnb;

//------------------------------------------------------------------------
// The internal state of the the reader
//------------------------------------------------------------------------
std::unique_ptr<Message> incmsg;
};
}

#endif /* SRC_XRDCL_XRDCLASYNCHSREADER_HH_ */
60 changes: 15 additions & 45 deletions src/XrdCl/XrdClAsyncSocketHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ namespace XrdCl
pStream( strm ),
pStreamName( ToStreamName( strm, subStreamNum ) ),
pSocket( new Socket() ),
pHSIncoming( 0 ),
pOutgoing( 0 ),
pSignature( 0 ),
pHandShakeData( 0 ),
pHandShakeDone( false ),
pConnectionStarted( 0 ),
pConnectionTimeout( 0 ),
pHeaderDone( false ),
pOutMsgDone( false ),
pOutHandler( 0 ),
pOutMsgSize( 0 ),
Expand Down Expand Up @@ -319,6 +317,7 @@ namespace XrdCl
pSocket->SetStatus( Socket::Connected );
hswriter.reset( new MsgWriter( *pSocket, pStreamName ) );
rspreader.reset( new AsyncMsgReader( *pTransport, *pSocket, pStreamName, *pStream, pSubStreamNum ) );
hsreader.reset( new AsyncHSReader( *pTransport, *pSocket, pStreamName, *pStream, pSubStreamNum ) );

//--------------------------------------------------------------------------
// Cork the socket
Expand Down Expand Up @@ -603,11 +602,20 @@ namespace XrdCl
//----------------------------------------------------------------------------
void AsyncSocketHandler::OnReadWhileHandshaking()
{
//--------------------------------------------------------------------------
// Make sure the response reader object exists
//--------------------------------------------------------------------------
if( !hsreader )
{
OnFault( XRootDStatus( stError, errInternal, 0, "Hand-shake reader is null." ) );
return;
}

//--------------------------------------------------------------------------
// Read the message and let the transport handler look at it when
// reading has finished
//--------------------------------------------------------------------------
XRootDStatus st = ReadMessage( pHSIncoming );
XRootDStatus st = hsreader->Read();
if( !st.IsOK() )
{
OnFaultWhileHandshaking( st );
Expand All @@ -617,19 +625,18 @@ namespace XrdCl
if( st.code != suDone )
return;

HandleHandShake();
HandleHandShake( hsreader->ReleaseMsg() );
}

//------------------------------------------------------------------------
// Handle the handshake message
//------------------------------------------------------------------------
void AsyncSocketHandler::HandleHandShake()
void AsyncSocketHandler::HandleHandShake( std::unique_ptr<Message> msg )
{
//--------------------------------------------------------------------------
// OK, we have a new message, let's deal with it;
//--------------------------------------------------------------------------
pHandShakeData->in = pHSIncoming;
pHSIncoming = 0;
pHandShakeData->in = msg.release();
XRootDStatus st = pTransport->HandShake( pHandShakeData, *pChannelData );

//--------------------------------------------------------------------------
Expand Down Expand Up @@ -735,42 +742,6 @@ namespace XrdCl
}
}

//----------------------------------------------------------------------------
// Read a message
//----------------------------------------------------------------------------
XRootDStatus AsyncSocketHandler::ReadMessage( Message *&toRead )
{
if( !toRead )
{
pHeaderDone = false;
toRead = new Message();
}

XRootDStatus st;
Log *log = DefaultEnv::GetLog();
if( !pHeaderDone )
{
st = pTransport->GetHeader( *toRead, pSocket );
if( st.IsOK() && st.code == suDone )
{
log->Dump( AsyncSockMsg,
"[%s] Received message header, size: %d",
pStreamName.c_str(), toRead->GetCursor() );
pHeaderDone = true;
}
else
return st;
}

st = pTransport->GetBody( *toRead, pSocket );
if( st.IsOK() && st.code == suDone )
{
log->Dump( AsyncSockMsg, "[%s] Received a message of %d bytes",
pStreamName.c_str(), toRead->GetSize() );
}
return st;
}

//----------------------------------------------------------------------------
// Handle fault
//----------------------------------------------------------------------------
Expand All @@ -795,8 +766,7 @@ namespace XrdCl
Log *log = DefaultEnv::GetLog();
log->Error( AsyncSockMsg, "[%s] Socket error while handshaking: %s",
pStreamName.c_str(), st.ToString().c_str() );
delete pHSIncoming;
pHSIncoming = 0;
if( hsreader ) hsreader->Reset();
if( hswriter ) hswriter->Reset();

pStream->OnConnectError( pSubStreamNum, st );
Expand Down
11 changes: 3 additions & 8 deletions src/XrdCl/XrdClAsyncSocketHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "XrdCl/XrdClURL.hh"
#include "XrdCl/XrdClAsyncWriter.hh"
#include "XrdCl/XrdClAsyncMsgReader.hh"
#include "XrdCl/XrdClAsyncHSReader.hh"

namespace XrdCl
{
Expand Down Expand Up @@ -192,18 +193,13 @@ namespace XrdCl
//------------------------------------------------------------------------
// Handle the handshake message
//------------------------------------------------------------------------
void HandleHandShake();
void HandleHandShake( std::unique_ptr<Message> msg );

//------------------------------------------------------------------------
// Prepare the next step of the hand-shake procedure
//------------------------------------------------------------------------
void HandShakeNextStep( bool done );

//------------------------------------------------------------------------
// Read a message
//------------------------------------------------------------------------
XRootDStatus ReadMessage( Message *&toRead );

//------------------------------------------------------------------------
// Handle fault
//------------------------------------------------------------------------
Expand Down Expand Up @@ -278,7 +274,6 @@ namespace XrdCl
Stream *pStream;
std::string pStreamName;
Socket *pSocket;
Message *pHSIncoming;
Message *pOutgoing;
Message *pSignature;
XrdNetAddr pSockAddr;
Expand All @@ -287,7 +282,6 @@ namespace XrdCl
uint16_t pTimeoutResolution;
time_t pConnectionStarted;
time_t pConnectionTimeout;
bool pHeaderDone;
bool pOutMsgDone;
OutgoingMsgHandler *pOutHandler;
uint32_t pOutMsgSize;
Expand All @@ -297,6 +291,7 @@ namespace XrdCl

std::unique_ptr<MsgWriter> hswriter;
std::unique_ptr<AsyncMsgReader> rspreader;
std::unique_ptr<AsyncHSReader> hsreader;
};
}

Expand Down

0 comments on commit 6837632

Please sign in to comment.