Skip to content

Commit

Permalink
[XrdEc] Do error correction on failed read.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 7, 2021
1 parent cacfbf7 commit 60f1827
Showing 1 changed file with 191 additions and 52 deletions.
243 changes: 191 additions & 52 deletions src/XrdEc/XrdEcReader.hh
Expand Up @@ -24,6 +24,7 @@
#include <algorithm>
#include <iterator>
#include <list>
#include <numeric>

namespace XrdEc
{
Expand Down Expand Up @@ -101,10 +102,13 @@ namespace XrdEc
//-------------------------------------------------------------------------
struct block_t
{
typedef std::tuple<uint64_t, uint32_t, char*, callback_t> args_t;
typedef std::vector<args_t> pending_t;

//-----------------------------------------------------------------------
// Stripe state: empty / loading / valid
//-----------------------------------------------------------------------
enum state_t { Empty = 0, Loading, Valid };
enum state_t { Empty = 0, Loading, Valid, Missing, Recovering };

//-----------------------------------------------------------------------
// Constructor
Expand All @@ -114,7 +118,8 @@ namespace XrdEc
stripes( objcfg.nbchunks ),
state( objcfg.nbchunks, Empty ),
pending( objcfg.nbchunks ),
blkid( blkid )
blkid( blkid ),
recovering( 0 )
{
}

Expand All @@ -141,56 +146,33 @@ namespace XrdEc
//---------------------------------------------------------------------
if( state[strpid] == Empty )
{
//-------------------------------------------------------------------
// Prepare the read callback
//-------------------------------------------------------------------
auto cb = [=]( const XrdCl::XRootDStatus &st, uint32_t )
{
std::unique_lock<std::mutex> lck( mtx );
pending_t &p = pending[strpid];
auto itr = p.begin();
//---------------------------------------------------------------
// Iterate over all pending read operations for given stripe
//---------------------------------------------------------------
for( ; itr != p.end() ; ++itr )
{
auto &args = *itr;
callback_t &callback = std::get<3>( args );
uint32_t nbrd = 0; // number of bytes read
//-------------------------------------------------------------
// If the read was successful, copy the data to user buffer
//-------------------------------------------------------------
if( st.IsOK() )
{
uint64_t offset = std::get<0>( args );
uint32_t size = std::get<1>( args );
char *usrbuff = std::get<2>( args );
if( offset + size > stripes[strpid].size() )
size = stripes[strpid].size() - offset;
memcpy( usrbuff, stripes[strpid].data() + offset, size );
nbrd = size;
}
//-------------------------------------------------------------
// Call the user callback
//-------------------------------------------------------------
callback( st, nbrd );
}
};

reader.Read( blkid, strpid, stripes[strpid], cb );
reader.Read( blkid, strpid, stripes[strpid], read_callback( strpid ) );
state[strpid] = Loading;
}
//---------------------------------------------------------------------
// The cache is loading, we don't have the data yet
// The stripe is either corrupted or unreachable
//---------------------------------------------------------------------
if( state[strpid] == Loading )
if( state[strpid] == Missing )
{
auto cb = [=]( const XrdCl::XRootDStatus &st, uint32_t nbrd )
if( !error_correction() )
{
if( st.IsOK() ) state[strpid] = Valid;
usrcb( st, nbrd );
};
pending[strpid].emplace_back( offset, size, usrbuff, cb );
//-----------------------------------------------------------------
// Recovery was not possible, notify the user of the error
//-----------------------------------------------------------------
usrcb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ), 0 );
return;
}
//-------------------------------------------------------------------
// we fall through to the following if-statements that will handle
// Recovering / Valid state
//-------------------------------------------------------------------
}
//---------------------------------------------------------------------
// The cache is loading or recovering, we don't have the data yet
//---------------------------------------------------------------------
if( state[strpid] == Loading || state[strpid] == Recovering )
{
pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
return;
}
//---------------------------------------------------------------------
Expand All @@ -211,15 +193,172 @@ namespace XrdEc
usrcb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidOp ), 0 );
}

typedef std::tuple<uint64_t, uint32_t, char*, callback_t> args_t;
typedef std::vector<args_t> pending_t;
//-----------------------------------------------------------------------
//
//-----------------------------------------------------------------------
bool error_correction()
{
//---------------------------------------------------------------------
// Check if we can do the recovery at all (if too many stripes are
// missing it wont be possible)
//---------------------------------------------------------------------
size_t missingcnt = std::accumulate( state.begin(), state.end(), 0,
[]( size_t cnt, state_t st )
{
if( st == Missing ) return cnt + 1;
return cnt;
} );
if( missingcnt > objcfg.nbparity ) return false;
//---------------------------------------------------------------------
// If there are no missing stripes all is good ...
//---------------------------------------------------------------------
if( missingcnt == 0 ) return true;

//---------------------------------------------------------------------
// Check if we can do the recovery right away
//---------------------------------------------------------------------
size_t validcnt = std::accumulate( state.begin(), state.end(), 0,
[]( size_t cnt, state_t st )
{
if( st == Valid ) return cnt + 1;
return cnt;
} );
if( validcnt >= objcfg.nbdata )
{
Config &cfg = Config::Instance();
stripes_t strps( get_stripes() );
try
{
cfg.GetRedundancy( objcfg ).compute( strps );
}
catch( const IOError &ex )
{
return false;
}
//-------------------------------------------------------------------
// Now when we recovered the data we need to mark every stripe as
// valid.
//-------------------------------------------------------------------
std::for_each( state.begin(), state.end(), []( state_t &s ){ s = Valid; } );
return true;
}

//---------------------------------------------------------------------
// Try loading the data and only then attempt recovery
//---------------------------------------------------------------------
size_t loadingcnt = std::accumulate( state.begin(), state.end(), 0,
[]( size_t cnt, state_t st )
{
if( st == Loading ) return cnt + 1;
return cnt;
} );
size_t i = 0;
while( loadingcnt + validcnt < objcfg.nbdata )
{
size_t strpid = i++;
if( state[strpid] != Empty ) continue;
reader.Read( blkid, strpid, stripes[strpid], read_callback( strpid ) );
state[strpid] = Loading;
++loadingcnt;
}

//-------------------------------------------------------------------
// Now that we triggered the recovery procedure mark every missing
// stripe as recovering.
//-------------------------------------------------------------------
std::for_each( state.begin(), state.end(),
[]( state_t &s ){ if( s == Missing ) s = Recovering; } );
return true;
}

//-----------------------------------------------------------------------
// Get a callback for read operation
//-----------------------------------------------------------------------
inline
callback_t read_callback( size_t strpid )
{
return [=]( const XrdCl::XRootDStatus &st, uint32_t )
{
std::unique_lock<std::mutex> lck( mtx );
state[strpid] = st.IsOK() ? Valid : Missing;
//------------------------------------------------------------
// Check if we need to do any error correction (either for
// the current stripe, or any other stripe)
//------------------------------------------------------------
bool recoverable = error_correction();
//------------------------------------------------------------
// Carry out the pending read requests if we got the data or
// if there was an error and we cannot recover
//------------------------------------------------------------
if( st.IsOK() || !recoverable )
carryout( pending[strpid], stripes[strpid], st );
};
}

//-----------------------------------------------------------------------
// Get stripes_t data structure used for error recovery
//-----------------------------------------------------------------------
inline stripes_t get_stripes()
{
stripes_t ret;
ret.reserve( objcfg.nbchunks );
for( size_t i = 0; i < objcfg.nbchunks; ++i )
{
if( state[i] == Valid )
ret.emplace_back( stripes[i].data(), true );
else
{
stripes[i].resize( objcfg.chunksize, 0 );
ret.emplace_back( stripes[i].data(), false );
}
}
return ret;
}

//-----------------------------------------------------------------------
// Execute the pending read requests
//-----------------------------------------------------------------------
inline static
void carryout( pending_t &pending,
const buffer_t &stripe,
const XrdCl::XRootDStatus &st = XrdCl::XRootDStatus() )
{
auto itr = pending.begin();
//---------------------------------------------------------------
// Iterate over all pending read operations for given stripe
//---------------------------------------------------------------
for( ; itr != pending.end() ; ++itr )
{
auto &args = *itr;
callback_t &callback = std::get<3>( args );
uint32_t nbrd = 0; // number of bytes read
//-------------------------------------------------------------
// If the read was successful, copy the data to user buffer
//-------------------------------------------------------------
if( st.IsOK() )
{
uint64_t offset = std::get<0>( args );
uint32_t size = std::get<1>( args );
char *usrbuff = std::get<2>( args );
if( offset + size > stripe.size() )
size = stripe.size() - offset;
memcpy( usrbuff, stripe.data() + offset, size );
nbrd = size;
}
//-------------------------------------------------------------
// Call the user callback
//-------------------------------------------------------------
callback( st, nbrd );
}
}

Reader &reader;
ObjCfg &objcfg;
std::vector<buffer_t> stripes; //< data buffer for every stripe
std::vector<state_t> state; //< state of every data buffer (empty/loading/valid)
std::vector<pending_t> pending; //< pending reads per stripe
size_t blkid; //< block ID
std::vector<buffer_t> stripes; //< data buffer for every stripe
std::vector<state_t> state; //< state of every data buffer (empty/loading/valid)
std::vector<pending_t> pending; //< pending reads per stripe
size_t blkid; //< block ID
bool recovering; //< true if we are in the process of recovering data, false otherwise
std::mutex mtx;
};

Expand Down

0 comments on commit 60f1827

Please sign in to comment.