Skip to content

Commit

Permalink
[XrdCl] Correctly handler error/wait response to endsess request.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Mar 27, 2018
1 parent e288090 commit 814e8b1
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 16 deletions.
62 changes: 61 additions & 1 deletion src/XrdCl/XrdClAsyncSocketHandler.cc
Expand Up @@ -777,7 +777,12 @@ namespace XrdCl
pHandShakeData->in = pHSIncoming;
pHSIncoming = 0;
st = pTransport->HandShake( pHandShakeData, *pChannelData );
++pHandShakeData->step;

//--------------------------------------------------------------------------
// Deal with wait responses
//--------------------------------------------------------------------------
kXR_int32 waitSeconds = HandleWaitRsp( pHandShakeData->in );

delete pHandShakeData->in;
pHandShakeData->in = 0;

Expand All @@ -787,6 +792,38 @@ namespace XrdCl
return;
}

//--------------------------------------------------------------------------
// We are handling a wait response and the transport handler told
// as to retry the request
//--------------------------------------------------------------------------
if( st.code == suRetry && waitSeconds >= 0 )
{
time_t resendTime = ::time( 0 ) + waitSeconds;
if( resendTime > pConnectionStarted + pConnectionTimeout )
{
Log *log = DefaultEnv::GetLog();
log->Error( AsyncSockMsg,
"[%s] Wont retry kXR_endsess request because would"
"reach connection timeout.",
pStreamName.c_str() );

OnFaultWhileHandshaking( Status( stError, errSocketTimeout ) );
}
else
{
TaskManager *taskMgr = DefaultEnv::GetPostMaster()->GetTaskManager();
WaitTask *task = new WaitTask( this, pHandShakeData->out );
pHandShakeData->out = 0;
taskMgr->RegisterTask( task, resendTime );
}
return;
}

//--------------------------------------------------------------------------
// We successfully proceeded to the next step
//--------------------------------------------------------------------------
++pHandShakeData->step;

//--------------------------------------------------------------------------
// The transport handler gave us something to write
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -1017,4 +1054,27 @@ namespace XrdCl
bytesWritten = 0;
ToIov( chunks, offset, iov );
}


void AsyncSocketHandler::RetryHSMsg( Message *msg )
{
pHSOutgoing = msg;
Status st;
if( !(st = EnableUplink()).IsOK() )
{
OnFaultWhileHandshaking( st );
return;
}
}

kXR_int32 AsyncSocketHandler::HandleWaitRsp( Message *msg )
{
// It would be more coherent if this could be done in the
// transport layer, unfortunately the API does not allow it.
kXR_int32 waitSeconds = -1;
ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
if( rsp->hdr.status == kXR_wait )
waitSeconds = rsp->body.wait.seconds;
return waitSeconds;
}
}
41 changes: 41 additions & 0 deletions src/XrdCl/XrdClAsyncSocketHandler.hh
Expand Up @@ -24,6 +24,7 @@
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdCl/XrdClPoller.hh"
#include "XrdCl/XrdClPostMasterInterfaces.hh"
#include "XrdCl/XrdClTaskManager.hh"

#include <sys/types.h>
#include <sys/socket.h>
Expand All @@ -38,6 +39,32 @@ namespace XrdCl
//----------------------------------------------------------------------------
class AsyncSocketHandler: public SocketHandler
{
//------------------------------------------------------------------------
// We need an extra task for rescheduling of HS request that received
// a wait response.
//------------------------------------------------------------------------
class WaitTask: public XrdCl::Task
{
public:
WaitTask( XrdCl::AsyncSocketHandler *handler, XrdCl::Message *msg ):
pHandler( handler ), pMsg( msg )
{
std::ostringstream o;
o << "WaitTask for: 0x" << msg;
SetName( o.str() );
}

virtual time_t Run( time_t now )
{
pHandler->RetryHSMsg( pMsg );
return 0;
}

private:
XrdCl::AsyncSocketHandler *pHandler;
XrdCl::Message *pMsg;
};

public:
//------------------------------------------------------------------------
//! Constructor
Expand Down Expand Up @@ -229,6 +256,20 @@ namespace XrdCl
iovec *iov,
int &bytesWritten );

//------------------------------------------------------------------------
// Retry hand shake message
//------------------------------------------------------------------------
void RetryHSMsg( Message *msg );

//------------------------------------------------------------------------
// Extract the value of a wait response
//
// @param rsp : the server response
// @return : if rsp is a wait response then its value
// otherwise -1
//------------------------------------------------------------------------
inline kXR_int32 HandleWaitRsp( Message *rsp );

//------------------------------------------------------------------------
// Data members
//------------------------------------------------------------------------
Expand Down
48 changes: 33 additions & 15 deletions src/XrdCl/XrdClXRootDTransport.cc
Expand Up @@ -462,17 +462,16 @@ namespace XrdCl
{
Status st = ProcessEndSessionResp( handShakeData, info );

if( !st.IsOK() )
{
sInfo.status = XRootDStreamInfo::Broken;
return st;
}

if( st.IsOK() && st.code == suDone )
{
sInfo.status = XRootDStreamInfo::Connected;
return st;
}
else if( !st.IsOK() )
{
sInfo.status = XRootDStreamInfo::Broken;
}

return st;
}

return Status( stOK, suDone );
Expand Down Expand Up @@ -2020,19 +2019,38 @@ namespace XrdCl

ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();

// If we're good, we're good!
if( rsp->hdr.status == kXR_ok )
return Status();

// we ignore not found errors as such an error means the connection
// has been already terminated
if( rsp->hdr.status == kXR_error && rsp->body.error.errnum == kXR_NotFound )
return Status();

// other errors
if( rsp->hdr.status == kXR_error )
{
char *errorMsg = new char[rsp->hdr.dlen-3]; errorMsg[rsp->hdr.dlen-4] = 0;
memcpy( errorMsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
log->Debug( XRootDTransportMsg, "[%s] Got error response to "
std::string errorMsg( rsp->body.error.errmsg, rsp->hdr.dlen - 4 );
log->Error( XRootDTransportMsg, "[%s] Got error response to "
"kXR_endsess: %s", hsData->streamName.c_str(),
errorMsg );
delete [] errorMsg;
// we don't really care if it failed
// return Status( stFatal, errLoginFailed );
errorMsg.c_str() );
return Status( stFatal, errHandShakeFailed );
}

return Status();
// Wait Response.
if( rsp->hdr.status == kXR_wait )
{
std::string msg( rsp->body.wait.infomsg, rsp->hdr.dlen - 4 );
log->Info( XRootDTransportMsg, "[%s] Got wait response to "
"kXR_endsess: %s", hsData->streamName.c_str(),
msg.c_str() );
hsData->out = GenerateEndSession( hsData, info );
return Status( stOK, suRetry );
}

// Any other response is protocol violation
return Status( stError, errDataError );
}

//----------------------------------------------------------------------------
Expand Down

0 comments on commit 814e8b1

Please sign in to comment.