Skip to content

Commit

Permalink
[XrdCl] Implement PgWrite, part 4.
Browse files Browse the repository at this point in the history
Iplement retransmission of corrupted pages.
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent 2ea4d6f commit 1900ad2
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 22 deletions.
140 changes: 128 additions & 12 deletions src/XrdCl/XrdClFileStateHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1379,18 +1379,6 @@ namespace XrdCl
ResponseHandler *handler,
uint16_t timeout )
{
XrdSysMutexHelper scopedLock( pMutex );

if( pFileState == Error ) return pStatus;

if( pFileState != Opened && pFileState != Recovering )
return XRootDStatus( stError, errInvalidOp );

Log *log = DefaultEnv::GetLog();
log->Debug( FileMsg, "[0x%x@%s] Sending a pgwrite command for handle 0x%x to "
"%s", this, pFileUrl->GetURL().c_str(),
*((uint32_t*)pFileHandle), pDataServer->GetHostId().c_str() );

//--------------------------------------------------------------------------
// Validate the digest vector size
//--------------------------------------------------------------------------
Expand All @@ -1406,6 +1394,133 @@ namespace XrdCl
return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
}

//--------------------------------------------------------------------------
// Create a context for PgWrite operation
//--------------------------------------------------------------------------
struct pgwrt_t
{
pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
{
}

~pgwrt_t()
{
// if all retries were succesful no error status was set
if( !status ) status = new XRootDStatus();
handler->HandleResponse( status, nullptr );
}

static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
{
if( pgoff == offset ) return 0; // we need this if statment because we operate on unsigned integers
return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
}

inline void SetStatus( XRootDStatus* s )
{
if( !status ) status = s;
else delete s;
}

ResponseHandler *handler;
XRootDStatus *status;
};
auto pgwrt = std::make_shared<pgwrt_t>( handler );

int fLen, lLen;
XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
uint32_t fstpglen = fLen;

auto h = ResponseHandler::Wrap( [=]( auto* s, auto* r ) mutable
{
std::unique_ptr<AnyObject> scoped( r );
// if the request failed simply pass the status to the
// user handler
if( !s->IsOK() )
{
pgwrt->SetStatus( s );
return; // the destructor will call the handler
}
// also if the request was sucessful and there were no
// corrupted pages pass the status to the user handler
RetryInfo *inf = nullptr;
r->Get( inf );
if( !inf->NeedRetry() )
{
pgwrt->SetStatus( s );
return; // the destructor will call the handler
}
delete s;
// otherwise we need to retransmit the corrupted pages
for( size_t i = 0; i < inf->Size(); ++i )
{
auto tpl = inf->At( i );
uint64_t pgoff = std::get<0>( tpl );
uint32_t pglen = std::get<1>( tpl );
const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
auto h = ResponseHandler::Wrap( [=]( auto *s, auto *r ) mutable
{
std::unique_ptr<AnyObject> scoped( r );
// if we failed simply set the status
if( !s->IsOK() )
{
pgwrt->SetStatus( s );
return; // the destructor will call the handler
}
delete s;
// otherwise check if the data were not corrupted again
RetryInfo *inf = nullptr;
r->Get( inf );
if( inf->NeedRetry() ) // so we failed in the end
pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
"Failed to retransmit corrupted page" ) );
} );
auto st = PgWriteRetry( pgoff, pglen, pgbuf, pgdigest, h, timeout /*TODO*/ );
if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
}
} );

return PgWriteImpl( offset, size, buffer, cksums, 0, h, timeout );
}

//------------------------------------------------------------------------
// Write number of pages at a given offset - async
//------------------------------------------------------------------------
XRootDStatus FileStateHandler::PgWriteRetry( uint64_t offset,
uint32_t size,
const void *buffer,
uint32_t digest,
ResponseHandler *handler,
uint16_t timeout )
{
std::vector<uint32_t> cksums{ digest };
return PgWriteImpl( offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
}

//------------------------------------------------------------------------
// Write number of pages at a given offset - async
//------------------------------------------------------------------------
XRootDStatus FileStateHandler::PgWriteImpl( uint64_t offset,
uint32_t size,
const void *buffer,
std::vector<uint32_t> &cksums,
kXR_char flags,
ResponseHandler *handler,
uint16_t timeout )
{
XrdSysMutexHelper scopedLock( pMutex );

if( pFileState == Error ) return pStatus;

if( pFileState != Opened && pFileState != Recovering )
return XRootDStatus( stError, errInvalidOp );

Log *log = DefaultEnv::GetLog();
log->Debug( FileMsg, "[0x%x@%s] Sending a pgwrite command for handle 0x%x to "
"%s", this, pFileUrl->GetURL().c_str(),
*((uint32_t*)pFileHandle), pDataServer->GetHostId().c_str() );

//--------------------------------------------------------------------------
// Create the message
//--------------------------------------------------------------------------
Expand All @@ -1416,6 +1531,7 @@ namespace XrdCl
req->requestid = kXR_pgwrite;
req->offset = offset;
req->dlen = size + cksums.size() * sizeof( uint32_t );
req->reqflags = flags;
memcpy( req->fhandle, pFileHandle, 4 );

ChunkList *list = new ChunkList();
Expand Down
50 changes: 45 additions & 5 deletions src/XrdCl/XrdClFileStateHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,51 @@ namespace XrdCl
//! @return status of the operation
//------------------------------------------------------------------------
XRootDStatus PgWrite( uint64_t offset,
uint32_t size,
const void *buffer,
std::vector<uint32_t> &cksums,
ResponseHandler *handler,
uint16_t timeout = 0 );
uint32_t size,
const void *buffer,
std::vector<uint32_t> &cksums,
ResponseHandler *handler,
uint16_t timeout = 0 );

//------------------------------------------------------------------------
//! Write number of pages at a given offset - async
//!
//! @param offset offset from the beginning of the file
//! @param size buffer size
//! @param buffer a pointer to a buffer holding data pages
//! @param cksums the crc32c checksums for each 4KB page
//! @param handler handler to be notified when the response arrives
//! @param timeout timeout value, if 0 the environment default will be
//! used
//! @return status of the operation
//------------------------------------------------------------------------
XRootDStatus PgWriteRetry( uint64_t offset,
uint32_t size,
const void *buffer,
uint32_t digest,
ResponseHandler *handler,
uint16_t timeout = 0 );

//------------------------------------------------------------------------
//! Write number of pages at a given offset - async
//!
//! @param offset offset from the beginning of the file
//! @param size buffer size
//! @param buffer a pointer to a buffer holding data pages
//! @param cksums the crc32c checksums for each 4KB page
//! @param flags PgWrite flags
//! @param handler handler to be notified when the response arrives
//! @param timeout timeout value, if 0 the environment default will be
//! used
//! @return status of the operation
//------------------------------------------------------------------------
XRootDStatus PgWriteImpl( uint64_t offset,
uint32_t size,
const void *buffer,
std::vector<uint32_t> &cksums,
kXR_char flags,
ResponseHandler *handler,
uint16_t timeout = 0 );

//------------------------------------------------------------------------
//! Commit all pending disk writes - async
Expand Down
10 changes: 5 additions & 5 deletions src/XrdCl/XrdClXRootDMsgHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2233,13 +2233,13 @@ namespace XrdCl
else if( i == pgcnt - 1 ) len = cse->dlLast;
retries.push_back( std::make_tuple( pgoffs[i], len ) );
}

RetryInfo *info = new RetryInfo( std::move( retries ) );
AnyObject *obj = new AnyObject();
obj->Set( info );
response = obj;
}

RetryInfo *info = new RetryInfo( std::move( retries ) );
AnyObject *obj = new AnyObject();
obj->Set( info );
response = obj;

return Status();
}

Expand Down

0 comments on commit 1900ad2

Please sign in to comment.