Skip to content

Commit

Permalink
[XrdEc] Reader: make sure data block is not prematurely destroyed.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 14, 2021
1 parent 54087c0 commit 5a7d672
Showing 1 changed file with 48 additions and 41 deletions.
89 changes: 48 additions & 41 deletions src/XrdEc/XrdEcReader.cc
Expand Up @@ -131,35 +131,38 @@ namespace XrdEc
//-----------------------------------------------------------------------
// Read data from stripe
//
// @param self : the block_t object
// @param strpid : stripe ID
// @param offset : relative offset within the stripe
// @param size : number of bytes to be read from the stripe
// @param usrbuff : user buffer for the data
// @param usrcb : user callback to be notified when the read operation
// has been resolved
//-----------------------------------------------------------------------
void read( size_t strpid,
uint64_t offset,
uint32_t size,
char *usrbuff,
callback_t usrcb )
static void read( std::shared_ptr<block_t> &self,
size_t strpid,
uint64_t offset,
uint32_t size,
char *usrbuff,
callback_t usrcb )
{
std::unique_lock<std::mutex> lck( mtx );
std::unique_lock<std::mutex> lck( self->mtx );

//---------------------------------------------------------------------
// The cache is empty, we need to load the data
//---------------------------------------------------------------------
if( state[strpid] == Empty )
if( self->state[strpid] == Empty )
{
reader.Read( blkid, strpid, stripes[strpid], read_callback( strpid ) );
state[strpid] = Loading;
self->reader.Read( self->blkid, strpid, self->stripes[strpid],
read_callback( self, strpid ) );
self->state[strpid] = Loading;
}
//---------------------------------------------------------------------
// The stripe is either corrupted or unreachable
//---------------------------------------------------------------------
if( state[strpid] == Missing )
if( self->state[strpid] == Missing )
{
if( !error_correction() )
if( !error_correction( self ) )
{
//-----------------------------------------------------------------
// Recovery was not possible, notify the user of the error
Expand All @@ -175,19 +178,19 @@ namespace XrdEc
//---------------------------------------------------------------------
// The cache is loading or recovering, we don't have the data yet
//---------------------------------------------------------------------
if( state[strpid] == Loading || state[strpid] == Recovering )
if( self->state[strpid] == Loading || self->state[strpid] == Recovering )
{
pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
self->pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
return;
}
//---------------------------------------------------------------------
// We do have the data so we can serve the user right away
//---------------------------------------------------------------------
if( state[strpid] == Valid )
if( self->state[strpid] == Valid )
{
if( offset + size > stripes[strpid].size() )
size = stripes[strpid].size() - offset;
memcpy( usrbuff, stripes[strpid].data() + offset, size );
if( offset + size > self->stripes[strpid].size() )
size = self->stripes[strpid].size() - offset;
memcpy( usrbuff, self->stripes[strpid].data() + offset, size );
usrcb( XrdCl::XRootDStatus(), size );
return;
}
Expand All @@ -199,15 +202,18 @@ namespace XrdEc
}

//-----------------------------------------------------------------------
//
// If neccessary trigger error correction procedure
// @param self : the block_t object
// @return : false if the block is corrupted and cannot be recovered,
// true otherwise
//-----------------------------------------------------------------------
bool error_correction()
static bool error_correction( std::shared_ptr<block_t> &self )
{
//---------------------------------------------------------------------
// Do the accounting for our stripes
//---------------------------------------------------------------------
size_t missingcnt = 0, validcnt = 0, loadingcnt = 0, recoveringcnt = 0;
std::for_each( state.begin(), state.end(), [&]( state_t &s )
std::for_each( self->state.begin(), self->state.end(), [&]( state_t &s )
{
switch( s )
{
Expand All @@ -226,17 +232,17 @@ namespace XrdEc
// Check if we can do the recovery at all (if too many stripes are
// missing it wont be possible)
//---------------------------------------------------------------------
if( missingcnt + recoveringcnt > objcfg.nbparity ) return false;
if( missingcnt + recoveringcnt > self->objcfg.nbparity ) return false;
//---------------------------------------------------------------------
// Check if we can do the recovery right away
//---------------------------------------------------------------------
if( validcnt >= objcfg.nbdata )
if( validcnt >= self->objcfg.nbdata )
{
Config &cfg = Config::Instance();
stripes_t strps( get_stripes() );
stripes_t strps( self->get_stripes() );
try
{
cfg.GetRedundancy( objcfg ).compute( strps );
cfg.GetRedundancy( self->objcfg ).compute( strps );
}
catch( const IOError &ex )
{
Expand All @@ -246,57 +252,58 @@ namespace XrdEc
// Now when we recovered the data we need to mark every stripe as
// valid and execute the pending reads
//-------------------------------------------------------------------
for( size_t strpid = 0; strpid < objcfg.nbchunks; ++strpid )
for( size_t strpid = 0; strpid < self->objcfg.nbchunks; ++strpid )
{
if( state[strpid] != Recovering ) continue;
state[strpid] = Valid;
carryout( pending[strpid], stripes[strpid] );
if( self->state[strpid] != Recovering ) continue;
self->state[strpid] = Valid;
self->carryout( self->pending[strpid], self->stripes[strpid] );
}
return true;
}
//---------------------------------------------------------------------
// Try loading the data and only then attempt recovery
//---------------------------------------------------------------------
size_t i = 0;
while( loadingcnt + validcnt < objcfg.nbdata && i < objcfg.nbchunks )
while( loadingcnt + validcnt < self->objcfg.nbdata && i < self->objcfg.nbchunks )
{
size_t strpid = i++;
if( state[strpid] != Empty ) continue;
reader.Read( blkid, strpid, stripes[strpid], read_callback( strpid ) );
state[strpid] = Loading;
if( self->state[strpid] != Empty ) continue;
self->reader.Read( self->blkid, strpid, self->stripes[strpid],
read_callback( self, strpid ) );
self->state[strpid] = Loading;
++loadingcnt;
}

//-------------------------------------------------------------------
// Now that we triggered the recovery procedure mark every missing
// stripe as recovering.
//-------------------------------------------------------------------
std::for_each( state.begin(), state.end(),
std::for_each( self->state.begin(), self->state.end(),
[]( state_t &s ){ if( s == Missing ) s = Recovering; } );
return true;
}

//-----------------------------------------------------------------------
// Get a callback for read operation
//-----------------------------------------------------------------------
inline
callback_t read_callback( size_t strpid )
inline static
callback_t read_callback( std::shared_ptr<block_t> &self, size_t strpid )
{
return [=]( const XrdCl::XRootDStatus &st, uint32_t )
return [self, strpid]( const XrdCl::XRootDStatus &st, uint32_t ) mutable
{
std::unique_lock<std::mutex> lck( mtx );
state[strpid] = st.IsOK() ? Valid : Missing;
std::unique_lock<std::mutex> lck( self->mtx );
self->state[strpid] = st.IsOK() ? Valid : Missing;
//------------------------------------------------------------
// Check if we need to do any error correction (either for
// the current stripe, or any other stripe)
//------------------------------------------------------------
bool recoverable = error_correction();
bool recoverable = error_correction( self );
//------------------------------------------------------------
// Carry out the pending read requests if we got the data or
// if there was an error and we cannot recover
//------------------------------------------------------------
if( st.IsOK() || !recoverable )
carryout( pending[strpid], stripes[strpid], st );
self->carryout( self->pending[strpid], self->stripes[strpid], st );
};
}

Expand Down Expand Up @@ -474,7 +481,7 @@ namespace XrdEc
//-------------------------------------------------------------------
// Read data from a stripe
//-------------------------------------------------------------------
block->read( strpid, rdoff, rdsize, usrbuff, callback );
block_t::read( block, strpid, rdoff, rdsize, usrbuff, callback );
//-------------------------------------------------------------------
// Update absolute offset, read length, and user buffer
//-------------------------------------------------------------------
Expand Down

0 comments on commit 5a7d672

Please sign in to comment.