Skip to content

Commit

Permalink
[XrdCl] Use t/o events to measure lapsed time in order to replay HS r…
Browse files Browse the repository at this point in the history
…equests.

1. Do NOT call SocketHandler methods from other thread than event-loop.
2. Make sure event-loop writing is inactive until the HS wait time elapses.
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent 35652f3 commit 399489d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 53 deletions.
15 changes: 11 additions & 4 deletions src/XrdCl/XrdClAsyncHSWriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ namespace XrdCl
const std::string &strmname ) : writestage( WriteRequest ),
socket( socket ),
strmname( strmname ),
outmsg( nullptr ),
outmsgsize( 0 )
outmsg( nullptr )
{
}

Expand All @@ -61,7 +60,16 @@ namespace XrdCl
{
writestage = WriteRequest;
outmsg.reset( msg );
outmsgsize = 0;
}

//------------------------------------------------------------------------
//! Replay the message that has been sent
//------------------------------------------------------------------------
inline void Replay()
{
if( !outmsg ) return;
writestage = WriteRequest;
outmsg->SetCursor( 0 );
}

//------------------------------------------------------------------------
Expand Down Expand Up @@ -139,7 +147,6 @@ namespace XrdCl
// The internal state of the the reader
//------------------------------------------------------------------------
std::unique_ptr<Message> outmsg;
uint32_t outmsgsize;
};
}

Expand Down
92 changes: 69 additions & 23 deletions src/XrdCl/XrdClAsyncSocketHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace XrdCl
pHandShakeDone( false ),
pConnectionStarted( 0 ),
pConnectionTimeout( 0 ),
pHSWaitStarted( 0 ),
pHSWaitSeconds( 0 ),
pUrl( url ),
pTlsHandShakeOngoing( false )
{
Expand Down Expand Up @@ -234,6 +236,9 @@ namespace XrdCl
OnReadTimeout();
else
OnTimeoutWhileHandshaking();

if( pHSWaitSeconds )
CheckHSWait();
}

//--------------------------------------------------------------------------
Expand All @@ -244,12 +249,18 @@ namespace XrdCl
pLastActivity = time(0);
if( unlikely( pSocket->GetStatus() == Socket::Connecting ) )
OnConnectionReturn();
else if( unlikely( pTlsHandShakeOngoing ) )
OnTLSHandShake();
else if( likely( pHandShakeDone ) )
OnWrite();
else
OnWriteWhileHandshaking();
//------------------------------------------------------------------------
// Make sure we are not writing anything if we have been told to wait.
//------------------------------------------------------------------------
else if( pHSWaitSeconds == 0 )
{
if( unlikely( pTlsHandShakeOngoing ) )
OnTLSHandShake();
else if( likely( pHandShakeDone ) )
OnWrite();
else
OnWriteWhileHandshaking();
}
}

//--------------------------------------------------------------------------
Expand Down Expand Up @@ -429,9 +440,10 @@ namespace XrdCl
//--------------------------------------------------------------------------
if( st.code == suRetry ) return;
//--------------------------------------------------------------------------
// Bookkeeping ...
// Disable the uplink
// Note: at this point we don't deallocate the HS message as we might need
// to re-send it in case of a kXR_wait response
//--------------------------------------------------------------------------
hswriter->Reset();
if( !(st = DisableUplink()).IsOK() )
OnFaultWhileHandshaking( st );
}
Expand Down Expand Up @@ -540,12 +552,12 @@ namespace XrdCl
return;
}

//--------------------------------------------------------------------------
// We are handling a wait response and the transport handler told
// as to retry the request
//--------------------------------------------------------------------------
if( st.code == suRetry )
{
//------------------------------------------------------------------------
// We are handling a wait response and the transport handler told
// as to retry the request
//------------------------------------------------------------------------
if( waitSeconds >=0 )
{
time_t resendTime = ::time( 0 ) + waitSeconds;
Expand All @@ -561,15 +573,17 @@ namespace XrdCl
}
else
{
TaskManager *taskMgr = DefaultEnv::GetPostMaster()->GetTaskManager();
WaitTask *task = new WaitTask( this );
taskMgr->RegisterTask( task, resendTime );
//--------------------------------------------------------------------
// We need to wait before replaying the request
//--------------------------------------------------------------------
pHSWaitStarted = time( 0 );
pHSWaitSeconds = waitSeconds;
}
return;
}
//--------------------------------------------------------------------------
//------------------------------------------------------------------------
// We are re-sending a protocol request
//--------------------------------------------------------------------------
//------------------------------------------------------------------------
else if( pHandShakeData->out )
{
SendHSMsg();
Expand Down Expand Up @@ -711,9 +725,9 @@ namespace XrdCl
reqwriter.reset();
}

//------------------------------------------------------------------------
//----------------------------------------------------------------------------
// Carry out the TLS hand-shake
//------------------------------------------------------------------------
//----------------------------------------------------------------------------
XRootDStatus AsyncSocketHandler::DoTlsHandShake()
{
Log *log = DefaultEnv::GetLog();
Expand All @@ -739,9 +753,9 @@ namespace XrdCl
return st;
}

//------------------------------------------------------------------------
//----------------------------------------------------------------------------
// Handle read/write event if we are in the middle of a TLS hand-shake
//------------------------------------------------------------------------
//----------------------------------------------------------------------------
inline void AsyncSocketHandler::OnTLSHandShake()
{
XRootDStatus st = DoTlsHandShake();
Expand All @@ -751,6 +765,9 @@ namespace XrdCl
*pChannelData ) );
}

//----------------------------------------------------------------------------
// Prepare a HS writer for sending and enable uplink
//----------------------------------------------------------------------------
void AsyncSocketHandler::SendHSMsg()
{
if( !hswriter )
Expand All @@ -759,8 +776,27 @@ namespace XrdCl
"HS writer object missing!" ) );
return;
}
hswriter->Reset( pHandShakeData->out );
pHandShakeData->out = nullptr;
//--------------------------------------------------------------------------
// We only set a new HS message if this is not a replay due to kXR_wait
//--------------------------------------------------------------------------
if( !pHSWaitSeconds )
{
hswriter->Reset( pHandShakeData->out );
pHandShakeData->out = nullptr;
}
//--------------------------------------------------------------------------
// otherwise we replay the kXR_endsess request
//--------------------------------------------------------------------------
else
hswriter->Replay();
//--------------------------------------------------------------------------
// Make sure the wait state is reset
//--------------------------------------------------------------------------
pHSWaitSeconds = 0;
pHSWaitStarted = 0;
//--------------------------------------------------------------------------
// Enable writing so we can replay the HS message
//--------------------------------------------------------------------------
XRootDStatus st;
if( !(st = EnableUplink()).IsOK() )
{
Expand All @@ -779,4 +815,14 @@ namespace XrdCl
waitSeconds = rsp->body.wait.seconds;
return waitSeconds;
}

//------------------------------------------------------------------------
// Check if HS wait time elapsed
//------------------------------------------------------------------------
void AsyncSocketHandler::CheckHSWait()
{
time_t now = time( 0 );
if( now - pHSWaitStarted >= pHSWaitSeconds )
SendHSMsg();
}
}
34 changes: 8 additions & 26 deletions src/XrdCl/XrdClAsyncSocketHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,6 @@ namespace XrdCl
//----------------------------------------------------------------------------
class AsyncSocketHandler: public SocketHandler
{
//------------------------------------------------------------------------
// We need an extra task for rescheduling of HS request that received
// a wait response.
//------------------------------------------------------------------------
class WaitTask: public XrdCl::Task
{
public:
WaitTask( XrdCl::AsyncSocketHandler *handler ):
pHandler( handler )
{
std::ostringstream o;
o << "WaitTask for: 0x" << handler->pHandShakeData->out;
SetName( o.str() );
}

virtual time_t Run( time_t now )
{
pHandler->SendHSMsg();
return 0;
}

private:
XrdCl::AsyncSocketHandler *pHandler;
};

public:
//------------------------------------------------------------------------
//! Constructor
Expand Down Expand Up @@ -242,7 +217,7 @@ namespace XrdCl
void OnTLSHandShake();

//------------------------------------------------------------------------
// Retry hand shake message
// Prepare a HS writer for sending and enable uplink
//------------------------------------------------------------------------
void SendHSMsg();

Expand All @@ -255,6 +230,11 @@ namespace XrdCl
//------------------------------------------------------------------------
inline kXR_int32 HandleWaitRsp( Message *rsp );

//------------------------------------------------------------------------
// Check if HS wait time elapsed
//------------------------------------------------------------------------
void CheckHSWait();

//------------------------------------------------------------------------
// Data members
//------------------------------------------------------------------------
Expand All @@ -272,6 +252,8 @@ namespace XrdCl
time_t pConnectionStarted;
time_t pConnectionTimeout;
time_t pLastActivity;
time_t pHSWaitStarted;
time_t pHSWaitSeconds;
URL pUrl;
bool pTlsHandShakeOngoing;

Expand Down

0 comments on commit 399489d

Please sign in to comment.