Skip to content

Commit

Permalink
[XrdCl] Implement PgWrite, part 2.
Browse files Browse the repository at this point in the history
Parse and unmarshal the correction-segment in case it is present.
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent 2ed3cdf commit 0e8221e
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 46 deletions.
3 changes: 3 additions & 0 deletions src/XrdCl/XrdClAsyncSocketHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,9 @@ namespace XrdCl
pIncHandler.second = true;
repeat = true;
}

if( action & IncomingMsgHandler::More )
repeat = true; // for pgwrite we might have additional non-raw data
}
}
while( repeat );
Expand Down
3 changes: 2 additions & 1 deletion src/XrdCl/XrdClPostMasterInterfaces.hh
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ namespace XrdCl
NoProcess = 0x0010, //!< don't call the processing callback
//!< even if the message belongs to this
//!< handler
Corrupted = 0x0020 //!< the handler discovered that the message
Corrupted = 0x0020, //!< the handler discovered that the message
//!< header is corrupted, we will have to
//!< tear down the socket
More = 0x0040 //!< there are more (non-raw) data to be read
};

//------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions src/XrdCl/XrdClStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,9 @@ namespace XrdCl
if( action & IncomingMsgHandler::Corrupted )
return IncomingMsgHandler::Corrupted;

if( action & IncomingMsgHandler::More )
return IncomingMsgHandler::More;

return IncomingMsgHandler::None;
}

Expand Down
92 changes: 50 additions & 42 deletions src/XrdCl/XrdClXRootDMsgHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,59 +341,37 @@ namespace XrdCl
if( msg->GetSize() < sizeof( ServerResponseStatus ) )
return Ignore;

//--------------------------------------------------------------------------
// Calculate the crc32c before the unmarshaling the body!
//--------------------------------------------------------------------------
ServerResponseStatus *rspst = (ServerResponseStatus*)msg->GetBuffer();
char *buffer = msg->GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );

//--------------------------------------------------------------------------
// Unmarshal the status body
//--------------------------------------------------------------------------
ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
uint16_t reqId = ntohs( req->header.requestid );
XRootDStatus st = XRootDTransport::UnMarshalStatusBody( msg, reqId );
if( !st.IsOK() )
{
log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
pUrl.GetHostId().c_str() );
pStatus = st;
HandleRspOrQueue();
return Ignore;
}

//--------------------------------------------------------------------------
// Do the integrity checks
// Unmarshal the status body
//--------------------------------------------------------------------------
if( crcval != rspst->bdy.crc32c )
if( !pRspStatusBodyUnMarshaled )
{
log->Error( XRootDMsg, "[%s] kXR_status response header corrupted "
"(crc32c integrity check failed).", pUrl.GetHostId().c_str() );
return Corrupted;
}

if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
{
log->Error( XRootDMsg, "[%s] kXR_status response header corrupted "
"(stream ID mismatch).", pUrl.GetHostId().c_str() );
return Corrupted;
}

XRootDStatus st = XRootDTransport::UnMarshalStatusBody( msg, reqId );

if( !st.IsOK() && st.code == errDataError )
{
log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
st.GetErrorMessage().c_str() );
return Corrupted;
}

if( rspst->bdy.requestid + kXR_1stRequest != reqId )
{
log->Error( XRootDMsg, "[%s] kXR_status response header corrupted "
"(request ID mismatch).", pUrl.GetHostId().c_str() );
return Corrupted;
if( !st.IsOK() )
{
log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
pUrl.GetHostId().c_str() );
pStatus = st;
HandleRspOrQueue();
return Ignore;
}
pRspStatusBodyUnMarshaled = true;
}

//--------------------------------------------------------------------------
// Common handling for partial results
//--------------------------------------------------------------------------
ServerResponseStatus *rspst = (ServerResponseStatus*)msg->GetBuffer();
if( rspst->bdy.resptype == XrdProto::kXR_PartialResult )
{
pResponse = 0;
Expand Down Expand Up @@ -427,6 +405,36 @@ namespace XrdCl
else
action |= RemoveHandler;
}
else if( reqId == kXR_pgwrite )
{
// if data corruption has been detected on the server side we will
// send some additional data pointing to the pages that need to be
// retransmitted
if( size_t( sizeof( ServerResponseHeader ) + rspst->hdr.dlen + rspst->bdy.dlen ) >
msg->GetCursor() )
action |= More;
// if we already have this data we need to unmarshal it
else if( !pRspPgWrtRetrnsmReqUnMarshalled )
{
XRootDStatus st = XRootDTransport::UnMarchalStatusCSE( msg );
if( !st.IsOK() && st.code == errDataError )
{
log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
st.GetErrorMessage().c_str() );
return Corrupted;
}

if( !st.IsOK() )
{
log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
pUrl.GetHostId().c_str() );
pStatus = st;
HandleRspOrQueue();
return Ignore;
}
pRspPgWrtRetrnsmReqUnMarshalled = true;
}
}

return action;
}
Expand Down Expand Up @@ -1817,7 +1825,7 @@ namespace XrdCl
case kXR_close:
case kXR_write:
case kXR_writev:
case kXR_pgwrite:
case kXR_pgwrite: // TODO
case kXR_sync:
case kXR_chkpoint:
return Status();
Expand Down
12 changes: 11 additions & 1 deletion src/XrdCl/XrdClXRootDMsgHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ namespace XrdCl

pCV( 0 ),

pSslErrCnt( 0 )
pSslErrCnt( 0 ),

pRspStatusBodyUnMarshaled( false ),
pRspPgWrtRetrnsmReqUnMarshalled( false )
{
pPostMaster = DefaultEnv::GetPostMaster();
if( msg->GetSessionId() )
Expand Down Expand Up @@ -787,6 +790,13 @@ namespace XrdCl
// Count of consecutive `errTlsSslError` errors
//------------------------------------------------------------------------
size_t pSslErrCnt;

//------------------------------------------------------------------------
// Keep track if respective parts of kXR_status response have been
// unmarshaled.
//------------------------------------------------------------------------
bool pRspStatusBodyUnMarshaled;
bool pRspPgWrtRetrnsmReqUnMarshalled;
};
}

Expand Down
99 changes: 97 additions & 2 deletions src/XrdCl/XrdClXRootDTransport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "XrdSys/XrdSysPlatform.hh"
#include "XrdOuc/XrdOucErrInfo.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdOuc/XrdOucCRC.hh"
#include "XrdSys/XrdSysTimer.hh"
#include "XrdSys/XrdSysAtomics.hh"
#include "XrdSys/XrdSysPlugin.hh"
Expand Down Expand Up @@ -333,7 +334,7 @@ namespace XrdCl
bodySize = rsphdr->dlen;
else
{
size_t stlen = sizeof( ServerResponseStatus ) + sizeof( ServerResponseBody_pgRead );
size_t stlen = sizeof( ServerResponseStatus ); // we read everything up to the offset
if( message->GetCursor() < stlen )
bodySize = rsphdr->dlen;
else
Expand All @@ -359,6 +360,7 @@ namespace XrdCl
leftToBeRead -= bytesRead;
message->AdvanceCursor( bytesRead );
}

return XRootDStatus( stOK, suDone );
}

Expand Down Expand Up @@ -1230,6 +1232,14 @@ namespace XrdCl
//------------------------------------------------------------------------
XRootDStatus XRootDTransport::UnMarshalStatusBody( Message *msg, uint16_t reqType )
{
//--------------------------------------------------------------------------
// Calculate the crc32c before the unmarshaling the body!
//--------------------------------------------------------------------------
ServerResponseStatus *rspst = (ServerResponseStatus*)msg->GetBuffer();
char *buffer = msg->GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );

size_t stlen = sizeof( ServerResponseStatus );
switch( reqType )
{
Expand All @@ -1238,12 +1248,17 @@ namespace XrdCl
stlen += sizeof( ServerResponseBody_pgRead );
break;
}

case kXR_pgwrite:
{
stlen += sizeof( ServerResponseBody_pgWrite );
break;
}
}

if( msg->GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
"kXR_status: invalid message size." );

ServerResponseStatus *rspst = (ServerResponseStatus*)msg->GetBuffer();
rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
rspst->bdy.dlen = ntohl( rspst->bdy.dlen );

Expand All @@ -1255,8 +1270,88 @@ namespace XrdCl
pgrdbdy->offset = ntohll( pgrdbdy->offset );
break;
}

case kXR_pgwrite:
{
ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg->GetBuffer( sizeof( ServerResponseStatus ) );
pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
break;
}
}

//--------------------------------------------------------------------------
// Do the integrity checks
//--------------------------------------------------------------------------
if( crcval != rspst->bdy.crc32c )
{
return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
"corrupted (crc32c integrity check failed)." );
}

if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
{
return XRootDStatus( stError, errDataError, 0, "response header corrupted "
"(stream ID mismatch)." );
}



if( rspst->bdy.requestid + kXR_1stRequest != reqType )
{
return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
"(request ID mismatch)." );
}

return XRootDStatus();
}

//----------------------------------------------------------------------------
// Unmarshall the correction-segment of the status response for pgwrite
//----------------------------------------------------------------------------
XRootDStatus XRootDTransport::UnMarchalStatusCSE( Message *msg )
{
ServerResponseV2 *rsp = (ServerResponseV2*)msg->GetBuffer();
//--------------------------------------------------------------------------
// If there's no additional data there's nothing to unmarshal
//--------------------------------------------------------------------------
if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
//--------------------------------------------------------------------------
// If there's not enough data to form correction-segment report an error
//--------------------------------------------------------------------------
if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
return XRootDStatus( stError, errInvalidMessage, 0,
"kXR_status: invalid message size." );

//--------------------------------------------------------------------------
// Calculate the crc32c for the additional data
//--------------------------------------------------------------------------
ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg->GetBuffer( sizeof( ServerResponseV2 ) );
cse->cseCRC = ntohl( cse->cseCRC );
size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
void* buffer = msg->GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );

//--------------------------------------------------------------------------
// Do the integrity checks
//--------------------------------------------------------------------------
if( crcval != cse->cseCRC )
{
return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
"corrupted (crc32c integrity check failed)." );
}

cse->dlFirst = ntohs( cse->dlFirst );
cse->dlLast = ntohs( cse->dlLast );

size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
sizeof( kXR_int64 );
kXR_int64 *pgoffs = (kXR_int64*)msg->GetBuffer( sizeof( ServerResponseV2 ) +
sizeof( ServerResponseBody_pgWrCSE ) );

for( size_t i = 0; i < pgcnt; ++i )
pgoffs[i] = ntohll( pgoffs[i] );

return XRootDStatus();
}

Expand Down
5 changes: 5 additions & 0 deletions src/XrdCl/XrdClXRootDTransport.hh
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ namespace XrdCl
//------------------------------------------------------------------------
static XRootDStatus UnMarshalStatusBody( Message *msg, uint16_t reqType );

//------------------------------------------------------------------------
//! Unmarshall the correction-segment of the status response for pgwrite
//------------------------------------------------------------------------
static XRootDStatus UnMarchalStatusCSE( Message *msg );

//------------------------------------------------------------------------
//! Unmarshall the header incoming message
//------------------------------------------------------------------------
Expand Down

0 comments on commit 0e8221e

Please sign in to comment.