diff --git a/src/XrdCl/XrdClAsyncSocketHandler.cc b/src/XrdCl/XrdClAsyncSocketHandler.cc index 213f553bc14..8a18e201f9a 100644 --- a/src/XrdCl/XrdClAsyncSocketHandler.cc +++ b/src/XrdCl/XrdClAsyncSocketHandler.cc @@ -777,7 +777,12 @@ namespace XrdCl pHandShakeData->in = pHSIncoming; pHSIncoming = 0; st = pTransport->HandShake( pHandShakeData, *pChannelData ); - ++pHandShakeData->step; + + //-------------------------------------------------------------------------- + // Deal with wait responses + //-------------------------------------------------------------------------- + kXR_int32 waitSeconds = HandleWaitRsp( pHandShakeData->in ); + delete pHandShakeData->in; pHandShakeData->in = 0; @@ -787,6 +792,38 @@ namespace XrdCl return; } + //-------------------------------------------------------------------------- + // We are handling a wait response and the transport handler told + // as to retry the request + //-------------------------------------------------------------------------- + if( st.code == suRetry && waitSeconds >= 0 ) + { + time_t resendTime = ::time( 0 ) + waitSeconds; + if( resendTime > pConnectionStarted + pConnectionTimeout ) + { + Log *log = DefaultEnv::GetLog(); + log->Error( AsyncSockMsg, + "[%s] Wont retry kXR_endsess request because would" + "reach connection timeout.", + pStreamName.c_str() ); + + OnFaultWhileHandshaking( Status( stError, errSocketTimeout ) ); + } + else + { + TaskManager *taskMgr = DefaultEnv::GetPostMaster()->GetTaskManager(); + WaitTask *task = new WaitTask( this, pHandShakeData->out ); + pHandShakeData->out = 0; + taskMgr->RegisterTask( task, resendTime ); + } + return; + } + + //-------------------------------------------------------------------------- + // We successfully proceeded to the next step + //-------------------------------------------------------------------------- + ++pHandShakeData->step; + //-------------------------------------------------------------------------- // The transport handler gave us something to write //-------------------------------------------------------------------------- @@ -1017,4 +1054,27 @@ namespace XrdCl bytesWritten = 0; ToIov( chunks, offset, iov ); } + + + void AsyncSocketHandler::RetryHSMsg( Message *msg ) + { + pHSOutgoing = msg; + Status st; + if( !(st = EnableUplink()).IsOK() ) + { + OnFaultWhileHandshaking( st ); + return; + } + } + + kXR_int32 AsyncSocketHandler::HandleWaitRsp( Message *msg ) + { + // It would be more coherent if this could be done in the + // transport layer, unfortunately the API does not allow it. + kXR_int32 waitSeconds = -1; + ServerResponse *rsp = (ServerResponse*)msg->GetBuffer(); + if( rsp->hdr.status == kXR_wait ) + waitSeconds = rsp->body.wait.seconds; + return waitSeconds; + } } diff --git a/src/XrdCl/XrdClAsyncSocketHandler.hh b/src/XrdCl/XrdClAsyncSocketHandler.hh index 6254ecef057..22514edec87 100644 --- a/src/XrdCl/XrdClAsyncSocketHandler.hh +++ b/src/XrdCl/XrdClAsyncSocketHandler.hh @@ -24,6 +24,7 @@ #include "XrdCl/XrdClDefaultEnv.hh" #include "XrdCl/XrdClPoller.hh" #include "XrdCl/XrdClPostMasterInterfaces.hh" +#include "XrdCl/XrdClTaskManager.hh" #include #include @@ -38,6 +39,32 @@ namespace XrdCl //---------------------------------------------------------------------------- class AsyncSocketHandler: public SocketHandler { + //------------------------------------------------------------------------ + // We need an extra task for rescheduling of HS request that received + // a wait response. + //------------------------------------------------------------------------ + class WaitTask: public XrdCl::Task + { + public: + WaitTask( XrdCl::AsyncSocketHandler *handler, XrdCl::Message *msg ): + pHandler( handler ), pMsg( msg ) + { + std::ostringstream o; + o << "WaitTask for: 0x" << msg; + SetName( o.str() ); + } + + virtual time_t Run( time_t now ) + { + pHandler->RetryHSMsg( pMsg ); + return 0; + } + + private: + XrdCl::AsyncSocketHandler *pHandler; + XrdCl::Message *pMsg; + }; + public: //------------------------------------------------------------------------ //! Constructor @@ -229,6 +256,20 @@ namespace XrdCl iovec *iov, int &bytesWritten ); + //------------------------------------------------------------------------ + // Retry hand shake message + //------------------------------------------------------------------------ + void RetryHSMsg( Message *msg ); + + //------------------------------------------------------------------------ + // Extract the value of a wait response + // + // @param rsp : the server response + // @return : if rsp is a wait response then its value + // otherwise -1 + //------------------------------------------------------------------------ + inline kXR_int32 HandleWaitRsp( Message *rsp ); + //------------------------------------------------------------------------ // Data members //------------------------------------------------------------------------ diff --git a/src/XrdCl/XrdClXRootDTransport.cc b/src/XrdCl/XrdClXRootDTransport.cc index 979e24698e0..767acf40fc8 100644 --- a/src/XrdCl/XrdClXRootDTransport.cc +++ b/src/XrdCl/XrdClXRootDTransport.cc @@ -462,17 +462,16 @@ namespace XrdCl { Status st = ProcessEndSessionResp( handShakeData, info ); - if( !st.IsOK() ) - { - sInfo.status = XRootDStreamInfo::Broken; - return st; - } - if( st.IsOK() && st.code == suDone ) { sInfo.status = XRootDStreamInfo::Connected; - return st; } + else if( !st.IsOK() ) + { + sInfo.status = XRootDStreamInfo::Broken; + } + + return st; } return Status( stOK, suDone ); @@ -2020,19 +2019,38 @@ namespace XrdCl ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer(); + // If we're good, we're good! + if( rsp->hdr.status == kXR_ok ) + return Status(); + + // we ignore not found errors as such an error means the connection + // has been already terminated + if( rsp->hdr.status == kXR_error && rsp->body.error.errnum == kXR_NotFound ) + return Status(); + + // other errors if( rsp->hdr.status == kXR_error ) { - char *errorMsg = new char[rsp->hdr.dlen-3]; errorMsg[rsp->hdr.dlen-4] = 0; - memcpy( errorMsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 ); - log->Debug( XRootDTransportMsg, "[%s] Got error response to " + std::string errorMsg( rsp->body.error.errmsg, rsp->hdr.dlen - 4 ); + log->Error( XRootDTransportMsg, "[%s] Got error response to " "kXR_endsess: %s", hsData->streamName.c_str(), - errorMsg ); - delete [] errorMsg; - // we don't really care if it failed - // return Status( stFatal, errLoginFailed ); + errorMsg.c_str() ); + return Status( stFatal, errHandShakeFailed ); } - return Status(); + // Wait Response. + if( rsp->hdr.status == kXR_wait ) + { + std::string msg( rsp->body.wait.infomsg, rsp->hdr.dlen - 4 ); + log->Info( XRootDTransportMsg, "[%s] Got wait response to " + "kXR_endsess: %s", hsData->streamName.c_str(), + msg.c_str() ); + hsData->out = GenerateEndSession( hsData, info ); + return Status( stOK, suRetry ); + } + + // Any other response is protocol violation + return Status( stError, errDataError ); } //----------------------------------------------------------------------------