diff --git a/src/XrdCl/XrdClAsyncMsgReader.hh b/src/XrdCl/XrdClAsyncMsgReader.hh index 1233b5744b5..cae92bfeab0 100644 --- a/src/XrdCl/XrdClAsyncMsgReader.hh +++ b/src/XrdCl/XrdClAsyncMsgReader.hh @@ -135,6 +135,7 @@ namespace XrdCl readstage = ReadRawData; continue; } + //---------------------------------------------------------------- // The next step is to read the message body //---------------------------------------------------------------- @@ -168,6 +169,23 @@ namespace XrdCl continue; } //------------------------------------------------------------------ + // kXR_status is special as it can have both body and raw data, + // handle it separately + //------------------------------------------------------------------ + case ReadMore: + { + XRootDStatus st = xrdTransport.GetMore( *inmsg, &socket ); + if( !st.IsOK() || st.code == suRetry ) + return st; + inmsgsize = inmsg->GetCursor(); + + //---------------------------------------------------------------- + // The next step is to finalize the read + //---------------------------------------------------------------- + readstage = ReadDone; + continue; + } + //------------------------------------------------------------------ // We need to call a raw message handler to get the data from the // socket //------------------------------------------------------------------ @@ -196,94 +214,38 @@ namespace XrdCl return st; inmsgsize = inmsg->GetCursor(); - //---------------------------------------------------------------- - // Now check if there are some additional raw data to be read - //---------------------------------------------------------------- - if( inhandler ) - { - //-------------------------------------------------------------- - // The next step is to finalize the read - //-------------------------------------------------------------- - readstage = ReadDone; - continue; - } - - uint16_t action = strm.InspectStatusRsp( substrmnb, - inhandler ); - - if( action & MsgHandler::Corrupted ) - return XRootDStatus( stError, errCorruptedHeader ); - - if( action & MsgHandler::Raw ) - { - //-------------------------------------------------------------- - // The next step is to read the raw data - //-------------------------------------------------------------- - readstage = ReadRawData; - continue; - } - - if( action & MsgHandler::More ) - { - //-------------------------------------------------------------- - // The next step is to read the additional data in the message - // body - //-------------------------------------------------------------- - readstage = ReadMsgBody; - continue; - } - - //---------------------------------------------------------------- - // Unless we've got a kXR_status message and no handler the - // read is done - //---------------------------------------------------------------- - ServerResponse *rsphdr = (ServerResponse *)inmsg->GetBuffer(); - if( !( action & MsgHandler::RemoveHandler ) || - rsphdr->hdr.status != kXR_status || - inmsg->GetSize() < sizeof( ServerResponseStatus ) ) - { - readstage = ReadDone; - continue; - } - - //---------------------------------------------------------------- - // There is no handler and we have a kXR_status message. If we - // have already read all the message then we're done. - //---------------------------------------------------------------- - ServerResponseStatus *rspst = (ServerResponseStatus*)inmsg->GetBuffer(); - const uint32_t hdrSize = rspst->hdr.dlen; - if( inmsg->GetSize() != hdrSize + 8 ) - { - readstage = ReadDone; - continue; - } //---------------------------------------------------------------- - // Only the header of kXR_status has been read. Unmarshall the - // header and if if there is more body data call GetBody() again. + // kXR_status response needs special handling as it can have + // either (body + raw data) or (body + additional body data) //---------------------------------------------------------------- - const uint16_t reqType = rspst->bdy.requestid + kXR_1stRequest; - st = XRootDTransport::UnMarshalStatusBody( *inmsg, reqType ); - - if( !st.IsOK() && st.code == errDataError ) + if( IsStatusRsp() ) { - log->Error( AsyncSockMsg, "[%s] Failed to unmarshall " - "corrupted status body in message 0x%x.", - strmname.c_str(), inmsg.get() ); - return XRootDStatus( stError, errCorruptedHeader ); - } - if( !st.IsOK() ) - { - log->Error( AsyncSockMsg, "[%s] Failed to unmarshall " - "status body of message 0x%x.", - strmname.c_str(), inmsg.get() ); - readstage = ReadDone; - continue; - } - if ( rspst->bdy.dlen != 0 ) - { - readstage = ReadMsgBody; - continue; + uint16_t action = strm.InspectStatusRsp( substrmnb, + inhandler ); + + if( action & MsgHandler::Corrupted ) + return XRootDStatus( stError, errCorruptedHeader ); + + if( action & MsgHandler::Raw ) + { + //-------------------------------------------------------------- + // The next step is to read the raw data + //-------------------------------------------------------------- + readstage = ReadRawData; + continue; + } + + if( action & MsgHandler::More ) + { + + //-------------------------------------------------------------- + // The next step is to read the additional data in the message + // body + //-------------------------------------------------------------- + readstage = ReadMore; + continue; + } } //---------------------------------------------------------------- @@ -342,6 +304,12 @@ namespace XrdCl return XRootDStatus(); } + inline bool IsStatusRsp() + { + ServerResponseHeader *hdr = (ServerResponseHeader*)inmsg->GetBuffer(); + return ( hdr->status == kXR_status ); + } + inline bool HasEmbeddedRsp() { ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 ); @@ -356,6 +324,7 @@ namespace XrdCl ReadStart, //< the next step is to initialize the read ReadHeader, //< the next step is to read the header ReadAttn, //< the next step is to read attn action code + ReadMore, //< the next step is to read more status body ReadMsgBody, //< the next step is to read the body ReadRawData, //< the next step is to read the raw data ReadDone //< the next step is to finalize the read diff --git a/src/XrdCl/XrdClPostMasterInterfaces.hh b/src/XrdCl/XrdClPostMasterInterfaces.hh index 187a9ce581b..8e21549b8fe 100644 --- a/src/XrdCl/XrdClPostMasterInterfaces.hh +++ b/src/XrdCl/XrdClPostMasterInterfaces.hh @@ -358,6 +358,19 @@ namespace XrdCl //------------------------------------------------------------------------ virtual XRootDStatus GetBody( Message &message, Socket *socket ) = 0; + //------------------------------------------------------------------------ + //! Read more of the message body from the socket, the socket is + //! non-blocking the method may be called multiple times - see GetHeader + //! for details + //! + //! @param message the message buffer containing the header + //! @param socket the socket + //! @return stOK & suDone if the whole message has been processed + //! stOK & suRetry if more data is needed + //! stError on failure + //------------------------------------------------------------------------ + virtual XRootDStatus GetMore( Message &message, Socket *socket ) = 0; + //------------------------------------------------------------------------ //! Initialize channel //------------------------------------------------------------------------ diff --git a/src/XrdCl/XrdClXRootDMsgHandler.cc b/src/XrdCl/XrdClXRootDMsgHandler.cc index d31f050e520..81e41d21bb1 100644 --- a/src/XrdCl/XrdClXRootDMsgHandler.cc +++ b/src/XrdCl/XrdClXRootDMsgHandler.cc @@ -311,33 +311,32 @@ namespace XrdCl // Ignore malformed status response //-------------------------------------------------------------------------- if( pResponse->GetSize() < sizeof( ServerResponseStatus ) ) - return Ignore; + { + log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() ); + return Corrupted; + } ClientRequest *req = (ClientRequest *)pRequest->GetBuffer(); uint16_t reqId = ntohs( req->header.requestid ); //-------------------------------------------------------------------------- // Unmarshal the status body //-------------------------------------------------------------------------- - if( !pRspStatusBodyUnMarshaled ) - { - XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId ); + XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId ); - if( !st.IsOK() && st.code == errDataError ) - { - log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(), - st.GetErrorMessage().c_str() ); - return Corrupted; - } + if( !st.IsOK() && st.code == errDataError ) + { + log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(), + st.GetErrorMessage().c_str() ); + return Corrupted; + } - if( !st.IsOK() ) - { - log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.", - pUrl.GetHostId().c_str() ); - pStatus = st; - HandleRspOrQueue(); - return Ignore; - } - pRspStatusBodyUnMarshaled = true; + if( !st.IsOK() ) + { + log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.", + pUrl.GetHostId().c_str() ); + pStatus = st; + HandleRspOrQueue(); + return Ignore; } //-------------------------------------------------------------------------- @@ -377,27 +376,6 @@ namespace XrdCl if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) > pResponse->GetCursor() ) action |= More; - // if we already have this data we need to unmarshal it - else if( !pRspPgWrtRetrnsmReqUnMarshalled ) - { - XRootDStatus st = XRootDTransport::UnMarchalStatusCSE( *pResponse ); - if( !st.IsOK() && st.code == errDataError ) - { - log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(), - st.GetErrorMessage().c_str() ); - return Corrupted; - } - - if( !st.IsOK() ) - { - log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.", - pUrl.GetHostId().c_str() ); - pStatus = st; - HandleRspOrQueue(); - return Ignore; - } - pRspPgWrtRetrnsmReqUnMarshalled = true; - } } return action; @@ -1126,7 +1104,6 @@ namespace XrdCl void XRootDMsgHandler::PartialReceived() { pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence - pRspStatusBodyUnMarshaled = false; } //---------------------------------------------------------------------------- diff --git a/src/XrdCl/XrdClXRootDMsgHandler.hh b/src/XrdCl/XrdClXRootDMsgHandler.hh index ea9c35176a8..3a344e66c51 100644 --- a/src/XrdCl/XrdClXRootDMsgHandler.hh +++ b/src/XrdCl/XrdClXRootDMsgHandler.hh @@ -175,10 +175,7 @@ namespace XrdCl pCV( 0 ), - pSslErrCnt( 0 ), - - pRspStatusBodyUnMarshaled( false ), - pRspPgWrtRetrnsmReqUnMarshalled( false ) + pSslErrCnt( 0 ) { pPostMaster = DefaultEnv::GetPostMaster(); if( msg->GetSessionId() ) @@ -670,13 +667,6 @@ namespace XrdCl // Count of consecutive `errTlsSslError` errors //------------------------------------------------------------------------ size_t pSslErrCnt; - - //------------------------------------------------------------------------ - // Keep track if respective parts of kXR_status response have been - // unmarshaled. - //------------------------------------------------------------------------ - bool pRspStatusBodyUnMarshaled; - bool pRspPgWrtRetrnsmReqUnMarshalled; }; } diff --git a/src/XrdCl/XrdClXRootDTransport.cc b/src/XrdCl/XrdClXRootDTransport.cc index aa0d20e22a4..f869327aa33 100644 --- a/src/XrdCl/XrdClXRootDTransport.cc +++ b/src/XrdCl/XrdClXRootDTransport.cc @@ -348,34 +348,59 @@ namespace XrdCl { //-------------------------------------------------------------------------- // Retrieve the body - // In case of non kXR_status responses we read all the response, including - // data. For kXR_status responses we first read only the remainder of the - // header. The header must then be unmarshalled, and then a second call of - // GetBody (repeated for suRetry as needed) will read the data. //-------------------------------------------------------------------------- size_t leftToBeRead = 0; uint32_t bodySize = 0; ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer(); bodySize = rsphdr->dlen; - if( rsphdr->status == kXR_status ) + + if( message.GetSize() < bodySize + 8 ) + message.ReAllocate( bodySize + 8 ); + + leftToBeRead = bodySize-(message.GetCursor()-8); + while( leftToBeRead ) { - if( message.GetCursor() >= bodySize+8 ) // we read everything up to the data[] - { - const size_t stlen = sizeof( ServerResponseStatus ); - if( bodySize+8 < stlen ) - return XRootDStatus( stError, errInvalidMessage, 0, - "kXR_status: invalid message size." ); + int bytesRead = 0; + XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead ); - ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer(); - bodySize = rspst->hdr.dlen + rspst->bdy.dlen; - } + if( !status.IsOK() || status.code == suRetry ) + return status; + leftToBeRead -= bytesRead; + message.AdvanceCursor( bytesRead ); } + return XRootDStatus( stOK, suDone ); + } + + //---------------------------------------------------------------------------- + // Read more of the message body from socket + //---------------------------------------------------------------------------- + XRootDStatus XRootDTransport::GetMore( Message &message, Socket *socket ) + { + ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer(); + if( rsphdr->status != kXR_status ) + return XRootDStatus( stError, errInvalidOp ); + + //-------------------------------------------------------------------------- + // In case of non kXR_status responses we read all the response, including + // data. For kXR_status responses we first read only the remainder of the + // header. The header must then be unmarshalled, and then a second call to + // GetMore (repeated for suRetry as needed) will read the data. + //-------------------------------------------------------------------------- + + uint32_t bodySize = rsphdr->dlen; + if( bodySize+8 < sizeof( ServerResponseStatus ) ) + return XRootDStatus( stError, errInvalidMessage, 0, + "kXR_status: invalid message size." ); + + ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer(); + bodySize += rspst->bdy.dlen; + if( message.GetSize() < bodySize + 8 ) message.ReAllocate( bodySize + 8 ); - leftToBeRead = bodySize-(message.GetCursor()-8); + size_t leftToBeRead = bodySize-(message.GetCursor()-8); while( leftToBeRead ) { int bytesRead = 0; @@ -388,6 +413,23 @@ namespace XrdCl message.AdvanceCursor( bytesRead ); } + // Unmarchal to message body + Log *log = DefaultEnv::GetLog(); + XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message ); + if( !st.IsOK() && st.code == errDataError ) + { + log->Error( XRootDTransportMsg, "[msg: 0x%x] %s", &message, + st.GetErrorMessage().c_str() ); + return st; + } + + if( !st.IsOK() ) + { + log->Error( XRootDTransportMsg, "[msg: 0x%x] Failed to unmarshall status body.", + &message ); + return st; + } + return XRootDStatus( stOK, suDone ); } @@ -1334,53 +1376,64 @@ namespace XrdCl return XRootDStatus(); } - //---------------------------------------------------------------------------- - // Unmarshall the correction-segment of the status response for pgwrite - //---------------------------------------------------------------------------- - XRootDStatus XRootDTransport::UnMarchalStatusCSE( Message &msg ) + XRootDStatus XRootDTransport::UnMarchalStatusMore( Message &msg ) { ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer(); - //-------------------------------------------------------------------------- - // If there's no additional data there's nothing to unmarshal - //-------------------------------------------------------------------------- - if( rsp->status.bdy.dlen == 0 ) return XRootDStatus(); - //-------------------------------------------------------------------------- - // If there's not enough data to form correction-segment report an error - //-------------------------------------------------------------------------- - if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) ) - return XRootDStatus( stError, errInvalidMessage, 0, - "kXR_status: invalid message size." ); + uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest; - //-------------------------------------------------------------------------- - // Calculate the crc32c for the additional data - //-------------------------------------------------------------------------- - ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) ); - cse->cseCRC = ntohl( cse->cseCRC ); - size_t length = rsp->status.bdy.dlen - sizeof( uint32_t ); - void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) ); - uint32_t crcval = XrdOucCRC::Calc32C( buffer, length ); - - //-------------------------------------------------------------------------- - // Do the integrity checks - //-------------------------------------------------------------------------- - if( crcval != cse->cseCRC ) + switch( reqType ) { - return XRootDStatus( stError, errDataError, 0, "kXR_status response header " - "corrupted (crc32c integrity check failed)." ); - } + case kXR_pgwrite: + { + //-------------------------------------------------------------------------- + // If there's no additional data there's nothing to unmarshal + //-------------------------------------------------------------------------- + if( rsp->status.bdy.dlen == 0 ) return XRootDStatus(); + //-------------------------------------------------------------------------- + // If there's not enough data to form correction-segment report an error + //-------------------------------------------------------------------------- + if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) ) + return XRootDStatus( stError, errInvalidMessage, 0, + "kXR_status: invalid message size." ); + + //-------------------------------------------------------------------------- + // Calculate the crc32c for the additional data + //-------------------------------------------------------------------------- + ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) ); + cse->cseCRC = ntohl( cse->cseCRC ); + size_t length = rsp->status.bdy.dlen - sizeof( uint32_t ); + void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) ); + uint32_t crcval = XrdOucCRC::Calc32C( buffer, length ); + + //-------------------------------------------------------------------------- + // Do the integrity checks + //-------------------------------------------------------------------------- + if( crcval != cse->cseCRC ) + { + return XRootDStatus( stError, errDataError, 0, "kXR_status response header " + "corrupted (crc32c integrity check failed)." ); + } - cse->dlFirst = ntohs( cse->dlFirst ); - cse->dlLast = ntohs( cse->dlLast ); + cse->dlFirst = ntohs( cse->dlFirst ); + cse->dlLast = ntohs( cse->dlLast ); - size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) / - sizeof( kXR_int64 ); - kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) + - sizeof( ServerResponseBody_pgWrCSE ) ); + size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) / + sizeof( kXR_int64 ); + kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) + + sizeof( ServerResponseBody_pgWrCSE ) ); - for( size_t i = 0; i < pgcnt; ++i ) - pgoffs[i] = ntohll( pgoffs[i] ); + for( size_t i = 0; i < pgcnt; ++i ) + pgoffs[i] = ntohll( pgoffs[i] ); - return XRootDStatus(); + return XRootDStatus(); + break; + } + + default: + break; + } + + return XRootDStatus( stError, errNotSupported ); } //---------------------------------------------------------------------------- diff --git a/src/XrdCl/XrdClXRootDTransport.hh b/src/XrdCl/XrdClXRootDTransport.hh index f66a68f7c5c..03386cb677b 100644 --- a/src/XrdCl/XrdClXRootDTransport.hh +++ b/src/XrdCl/XrdClXRootDTransport.hh @@ -83,6 +83,19 @@ namespace XrdCl //------------------------------------------------------------------------ virtual XRootDStatus GetBody( Message &message, Socket *socket ); + //------------------------------------------------------------------------ + //! Read more of the message body from the socket, the socket is + //! non-blocking the method may be called multiple times - see GetHeader + //! for details + //! + //! @param message the message buffer containing the header + //! @param socket the socket + //! @return stOK & suDone if the whole message has been processed + //! stOK & suRetry if more data is needed + //! stError on failure + //------------------------------------------------------------------------ + virtual XRootDStatus GetMore( Message &message, Socket *socket ); + //------------------------------------------------------------------------ //! Initialize channel //------------------------------------------------------------------------ @@ -190,7 +203,7 @@ namespace XrdCl //------------------------------------------------------------------------ //! Unmarshall the correction-segment of the status response for pgwrite //------------------------------------------------------------------------ - static XRootDStatus UnMarchalStatusCSE( Message &msg ); + static XRootDStatus UnMarchalStatusMore( Message &msg ); //------------------------------------------------------------------------ //! Unmarshall the header incoming message