Skip to content

Commit

Permalink
Merge pull request #1760 from smithdh/xrdcl-kxrstatus-sync
Browse files Browse the repository at this point in the history
[XrdCl] Avoid situation where client does not read all of a network message
  • Loading branch information
simonmichal committed Aug 15, 2022
2 parents 3756f1b + ef245a0 commit 25e600b
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 61 deletions.
114 changes: 87 additions & 27 deletions src/XrdCl/XrdClAsyncMsgReader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -199,36 +199,96 @@ namespace XrdCl
//----------------------------------------------------------------
// Now check if there are some additional raw data to be read
//----------------------------------------------------------------
if( !inhandler )
if( inhandler )
{
//--------------------------------------------------------------
// The next step is to finalize the read
//--------------------------------------------------------------
readstage = ReadDone;
continue;
}

uint16_t action = strm.InspectStatusRsp( substrmnb,
inhandler );

if( action & MsgHandler::Corrupted )
return XRootDStatus( stError, errCorruptedHeader );

if( action & MsgHandler::Raw )
{
//--------------------------------------------------------------
// The next step is to read the raw data
//--------------------------------------------------------------
readstage = ReadRawData;
continue;
}

if( action & MsgHandler::More )
{
uint16_t action = strm.InspectStatusRsp( substrmnb,
inhandler );

if( action & MsgHandler::Corrupted )
return XRootDStatus( stError, errCorruptedHeader );

if( action & MsgHandler::Raw )
{
//------------------------------------------------------------
// The next step is to read the raw data
//------------------------------------------------------------
readstage = ReadRawData;
continue;
}

if( action & MsgHandler::More )
{
//------------------------------------------------------------
// The next step is to read the additional data in the message
// body
//------------------------------------------------------------
readstage = ReadMsgBody;
continue;
}
//--------------------------------------------------------------
// The next step is to read the additional data in the message
// body
//--------------------------------------------------------------
readstage = ReadMsgBody;
continue;
}
//------------------------------------------------------------

//----------------------------------------------------------------
// Unless we've got a kXR_status message and no handler the
// read is done
//----------------------------------------------------------------
ServerResponse *rsphdr = (ServerResponse *)inmsg->GetBuffer();
if( !( action & MsgHandler::RemoveHandler ) ||
rsphdr->hdr.status != kXR_status ||
inmsg->GetSize() < sizeof( ServerResponseStatus ) )
{
readstage = ReadDone;
continue;
}

//----------------------------------------------------------------
// There is no handler and we have a kXR_status message. If we
// have already read all the message then we're done.
//----------------------------------------------------------------
ServerResponseStatus *rspst = (ServerResponseStatus*)inmsg->GetBuffer();
const uint32_t hdrSize = rspst->hdr.dlen;
if( inmsg->GetSize() != hdrSize + 8 )
{
readstage = ReadDone;
continue;
}

//----------------------------------------------------------------
// Only the header of kXR_status has been read. Unmarshall the
// header and if if there is more body data call GetBody() again.
//----------------------------------------------------------------
const uint16_t reqType = rspst->bdy.requestid + kXR_1stRequest;
st = XRootDTransport::UnMarshalStatusBody( *inmsg, reqType );

if( !st.IsOK() && st.code == errDataError )
{
log->Error( AsyncSockMsg, "[%s] Failed to unmarshall "
"corrupted status body in message 0x%x.",
strmname.c_str(), inmsg.get() );
return XRootDStatus( stError, errCorruptedHeader );
}
if( !st.IsOK() )
{
log->Error( AsyncSockMsg, "[%s] Failed to unmarshall "
"status body of message 0x%x.",
strmname.c_str(), inmsg.get() );
readstage = ReadDone;
continue;
}
if ( rspst->bdy.dlen != 0 )
{
readstage = ReadMsgBody;
continue;
}

//----------------------------------------------------------------
// The next step is to finalize the read
//------------------------------------------------------------
//----------------------------------------------------------------
readstage = ReadDone;
continue;
}
Expand Down
82 changes: 48 additions & 34 deletions src/XrdCl/XrdClStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,24 +467,35 @@ namespace XrdCl
msg->SetSessionId( pSessionId );
pBytesReceived += bytesReceived;

uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
*pChannelData );
if( streamAction & TransportHandler::DigestMsg )
return;
MsgHandler *handler = nullptr;
uint16_t action = 0;
{
InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
handler = mh.handler;
action = mh.action;
mh.Reset();
}

if( streamAction & TransportHandler::RequestClose )
if( !IsPartial( *msg ) )
{
RequestClose( *msg );
return;
uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
*pChannelData );
if( streamAction & TransportHandler::DigestMsg )
return;

if( streamAction & TransportHandler::RequestClose )
{
RequestClose( *msg );
return;
}
}

Log *log = DefaultEnv::GetLog();
InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;

//--------------------------------------------------------------------------
// No handler, we discard the message ...
//--------------------------------------------------------------------------
if( !mh.handler )
if( !handler )
{
ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
log->Warning( PostMasterMsg, "[%s] Discarding received message: 0x%x "
Expand All @@ -500,24 +511,22 @@ namespace XrdCl
log->Dump( PostMasterMsg, "[%s] Handling received message: 0x%x.",
pStreamName.c_str(), msg.get() );

if( mh.action & (MsgHandler::NoProcess|MsgHandler::Ignore) )
if( action & (MsgHandler::NoProcess|MsgHandler::Ignore) )
{
log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: 0x%x.",
pStreamName.c_str(), msg->GetDescription().c_str() );

// if we are handling partial response we have to take down the timeout fence
if( IsPartial( *msg ) )
{
XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( mh.handler );
XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
if( xrdHandler ) xrdHandler->PartialReceived();
}

mh.Reset();
return;
}

Job *job = new HandleIncMsgJob( mh.handler );
mh.Reset();
Job *job = new HandleIncMsgJob( handler );
pJobManager->QueueJob( job );
}

Expand Down Expand Up @@ -811,12 +820,14 @@ namespace XrdCl
}

//--------------------------------------------------------------------------
// Reinsert the receiving handler
// Reinsert the receiving handler and reset any partially read partial
//--------------------------------------------------------------------------
if( pSubStreams[subStream]->inMsgHelper.handler )
{
InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
if( xrdHandler ) xrdHandler->PartialReceived();
h.Reset();
}

Expand Down Expand Up @@ -894,9 +905,9 @@ namespace XrdCl
void Stream::ForceError( XRootDStatus status )
{
XrdSysMutexHelper scopedLock( pMutex );
Log *log = DefaultEnv::GetLog();
for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
{
Log *log = DefaultEnv::GetLog();
if( pSubStreams[substream]->status != Socket::Connected ) continue;
pSubStreams[substream]->socket->Close();
pSubStreams[substream]->status = Socket::Disconnected;
Expand All @@ -915,35 +926,37 @@ namespace XrdCl
}

//--------------------------------------------------------------------
// Reinsert the receiving handler
// Reinsert the receiving handler and reset any partially read partial
//--------------------------------------------------------------------
if( pSubStreams[substream]->inMsgHelper.handler )
{
InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
if( xrdHandler ) xrdHandler->PartialReceived();
h.Reset();
}
}

pConnectionCount = 0;
pConnectionCount = 0;

//------------------------------------------------------------------------
// We're done here, unlock the stream mutex to avoid deadlocks and
// report the disconnection event to the handlers
//------------------------------------------------------------------------
log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
"message handlers.", pStreamName.c_str() );
//------------------------------------------------------------------------
// We're done here, unlock the stream mutex to avoid deadlocks and
// report the disconnection event to the handlers
//------------------------------------------------------------------------
log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
"message handlers.", pStreamName.c_str() );

SubStreamList::iterator it;
OutQueue q;
for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
q.GrabItems( *(*it)->outQueue );
scopedLock.UnLock();
SubStreamList::iterator it;
OutQueue q;
for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
q.GrabItems( *(*it)->outQueue );
scopedLock.UnLock();

q.Report( status );
q.Report( status );

pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
}
pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
}

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -1117,7 +1130,8 @@ namespace XrdCl
MsgHandler *&incHandler )
{
InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
if( !mh.handler ) return false;
if( !mh.handler )
return MsgHandler::RemoveHandler;

uint16_t action = mh.handler->InspectStatusRsp();
mh.action |= action;
Expand Down

0 comments on commit 25e600b

Please sign in to comment.