Skip to content

Commit

Permalink
[XrdEc] Implement block caching for random reads.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 5, 2021
1 parent b6ed338 commit 6015396
Showing 1 changed file with 180 additions and 80 deletions.
260 changes: 180 additions & 80 deletions src/XrdEc/XrdEcReader.hh
Expand Up @@ -94,88 +94,128 @@ namespace XrdEc
//-------------------------------------------------------------------------
// Read callback, to be called with status and number of bytes read
//-------------------------------------------------------------------------
typedef std::function<void( const XrdCl::XRootDStatus&, buffer_t&& )> callback_t;
typedef std::function<void( const XrdCl::XRootDStatus&, uint32_t )> callback_t;

struct CacheEntry
//-------------------------------------------------------------------------
// A single data block
//-------------------------------------------------------------------------
struct block_t
{
CacheEntry( ObjCfg &objcfg ) : objcfg( objcfg ), evict( false ), blknb( 0 ), strpnb( 0 ), state( Empty )
//-----------------------------------------------------------------------
// Stripe state: empty / loading / valid
//-----------------------------------------------------------------------
enum state_t { Empty = 0, Loading, Valid };

//-----------------------------------------------------------------------
// Constructor
//-----------------------------------------------------------------------
block_t( size_t blkid, Reader &reader, ObjCfg &objcfg ) : reader( reader ),
objcfg( objcfg ),
stripes( objcfg.nbchunks ),
state( objcfg.nbchunks, Empty ),
pending( objcfg.nbchunks ),
blkid( blkid )
{
}

void Read( size_t offset,
size_t size,
char *usrbuff,
std::function<void( const XrdCl::XRootDStatus& )> callback )
//-----------------------------------------------------------------------
// Read data from stripe
//
// @param strpid : stripe ID
// @param offset : relative offset within the stripe
// @param size : number of bytes to be read from the stripe
// @param usrbuff : user buffer for the data
// @param callback : use callback to be notified when the read operation
// has been resolved
//-----------------------------------------------------------------------
void read( size_t strpid,
uint64_t offset,
uint32_t size,
char *usrbuff,
callback_t callback )
{
std::unique_lock<std::mutex> lck( mtx );
if( state != Valid )

//---------------------------------------------------------------------
// The cache is empty, we need to load the data
//---------------------------------------------------------------------
if( state[strpid] == Empty )
{
//-------------------------------------------------------------------
// Prepare the read callback
//-------------------------------------------------------------------
auto cb = [=]( const XrdCl::XRootDStatus &st, uint32_t )
{
std::unique_lock<std::mutex> lck( mtx );
pending_t &p = pending[strpid];
auto itr = p.begin();
//---------------------------------------------------------------
// Iterate over all pending read operations for given stripe
//---------------------------------------------------------------
for( ; itr != p.end() ; ++itr )
{
auto &args = *itr;
callback_t &callback = std::get<3>( args );
uint32_t nbrd = 0; // number of bytes read
//-------------------------------------------------------------
// If the read was successful, copy the data to user buffer
//-------------------------------------------------------------
if( st.IsOK() )
{
uint64_t offset = std::get<0>( args );
uint32_t size = std::get<1>( args );
char *usrbuff = std::get<2>( args );
if( offset + size > stripes[strpid].size() )
size = stripes[strpid].size() - offset;
memcpy( usrbuff, stripes[strpid].data() + offset, size );
nbrd = size;
}
//-------------------------------------------------------------
// Call the user callback
//-------------------------------------------------------------
callback( st, nbrd );
}
};

reader.Read( blkid, strpid, stripes[strpid], cb );
state[strpid] = Loading;
}
//---------------------------------------------------------------------
// The cache is loading, we don't have the data yet
//---------------------------------------------------------------------
if( state[strpid] == Loading )
{
callbacks.emplace_back( offset, size, usrbuff, std::move( callback ) );
pending[strpid].emplace_back( offset, size, usrbuff, callback );
return;
}
// if there was an error, forward it to the callback
if( !status.IsOK() )
//---------------------------------------------------------------------
// We do have the data so we can serve the user right away
//---------------------------------------------------------------------
if( state[strpid] == Valid )
{
lck.unlock();
callback( status );
if( offset + size > stripes[strpid].size() )
size = stripes[strpid].size() - offset;
memcpy( usrbuff, stripes[strpid].data() + offset, size );
callback( XrdCl::XRootDStatus(), size );
return;
}
// otherwise copy the data into user buffer
memcpy( usrbuff, buffer.data() + offset, size );
lck.unlock();
callback( status );
//---------------------------------------------------------------------
// In principle we should never end up here, nevertheless if this
// happens it is clearly an error ...
//---------------------------------------------------------------------
callback( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidOp ), 0 );
}

void Load( size_t blknb, size_t strpnb )
{
std::unique_lock<std::mutex> lck( mtx );
if( state != Empty ) return;
this->blknb = blknb;
this->strpnb = strpnb;
// TODO load the data from remote endpoint and verify integrity
// if necessary run recovery

}
typedef std::tuple<uint64_t, uint32_t, char*, callback_t> args_t;
typedef std::vector<args_t> pending_t;

enum state_t { Empty = 0, Loading, Valid };


typedef std::function<void( const XrdCl::XRootDStatus& )> callback_t;
typedef std::tuple<size_t, size_t, char*, callback_t> args_t;

ObjCfg &objcfg;
buffer_t buffer;
bool evict;
size_t blknb;
size_t strpnb;
state_t state;
XrdCl::XRootDStatus status;
std::list<args_t> callbacks;
std::mutex mtx;
};


struct ReadCache
{
ReadCache( ObjCfg &objcfg ): objcfg( objcfg ) { }

// CacheEntry& Get( size_t blknb, size_t strpnb )
// {
// auto itr = cache.find( blknb );
// if( itr == cache.end() )
// {
// itr = cache.emplace( blknb ).first;
// itr->second.resize( objcfg.nbchunks, objcfg );
// }
// std::vector<CacheEntry> &block = itr->second;
// block[strpnb].Load( blknb, strpnb );
// return block[strpnb];
// }

typedef std::unordered_map<size_t, std::vector<CacheEntry>> cache_t;

ObjCfg &objcfg;
cache_t cache;
Reader &reader;
ObjCfg &objcfg;
std::vector<buffer_t> stripes; //< data buffer for every stripe
std::vector<state_t> state; //< state of every data buffer (empty/loading/valid)
std::vector<pending_t> pending; //< pending reads per stripe
size_t blkid; //< block ID
std::mutex mtx;
};

public:
Expand Down Expand Up @@ -225,36 +265,95 @@ namespace XrdEc

void Read( uint64_t offset, uint32_t length, void *buffer, XrdCl::ResponseHandler *handler )
{
// TODO
char *usrbuff = reinterpret_cast<char*>( buffer );
typedef std::tuple<uint64_t, uint32_t, void*, uint32_t, XrdCl::ResponseHandler*, std::mutex> rdctx_t;
auto rdctx = std::make_shared<rdctx_t>( offset, 0, buffer, length, handler );

while( length > 0 )
{
size_t blkid = offset / objcfg.datasize; //< ID of the block from which we will be reading
size_t strpid = ( offset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
uint64_t rdoff = offset - blkid * objcfg.datasize - strpid * objcfg.chunksize; //< relative read offset within the stripe
uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
if( rdsize > length ) rdsize = length;
//-------------------------------------------------------------------
// Make sure we operate on a valid block
//-------------------------------------------------------------------
if( !block || block->blkid != blkid )
block = std::make_shared<block_t>( blkid, *this, objcfg );
//-------------------------------------------------------------------
// Prepare the callback for reading from single stripe
//-------------------------------------------------------------------
auto blk = block;
auto callback = [blk, rdctx, rdsize]( const XrdCl::XRootDStatus &st, uint32_t nbrd )
{
std::unique_lock<std::mutex> lck( std::get<5>( *rdctx ) );
//-----------------------------------------------------------------
// Handle failure ...
//-----------------------------------------------------------------
if( !st.IsOK() )
{
// TODO first check if we can recover using EC
XrdCl::ResponseHandler *h = std::get<4>( *rdctx );
if( h ) h->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
return;
}
//-----------------------------------------------------------------
// Handle success ...
//-----------------------------------------------------------------
std::get<1>( *rdctx ) += nbrd; // number of bytes read
std::get<3>( *rdctx ) -= rdsize; // number of bytes requested
if( std::get<3>( *rdctx ) == 0 )
{
XrdCl::ResponseHandler *handler = std::get<4>( *rdctx );
if( handler )
{
XrdCl::ChunkInfo *ch = new XrdCl::ChunkInfo( std::get<0>( *rdctx ),
std::get<1>( *rdctx ),
std::get<2>( *rdctx ) );
XrdCl::AnyObject *rsp = new XrdCl::AnyObject();
rsp->Set( ch );
handler->HandleResponse( new XrdCl::XRootDStatus(), rsp );
}
}
};
//-------------------------------------------------------------------
// Read data from a stripe
//-------------------------------------------------------------------
block->read( strpid, rdoff, rdsize, usrbuff, callback );
//-------------------------------------------------------------------
// Update absolute offset, read length, and user buffer
//-------------------------------------------------------------------
offset += rdsize;
length -= rdsize;
usrbuff += rdsize;
}
}

private:

void Read( size_t blknb, size_t strpnb, callback_t cb )
void Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb )
{
// generate the file name (blknb/strpnb)
std::string fn = objcfg.obj + '.' + std::to_string( blknb ) + '.' + std::to_string( strpnb );
// if the block/stripe does not exist it means we are reading passed the end of the file
auto itr = urlmap.find( fn );
if( itr == urlmap.end() ) return cb( XrdCl::XRootDStatus(), buffer_t() );
if( itr == urlmap.end() ) return cb( XrdCl::XRootDStatus(), 0 );
// get the URL of the ZIP archive with the respective data
const std::string &url = itr->second;
// get the ZipArchive object
auto &zipptr = dataarchs[url];
// check the size of the data to be read
XrdCl::StatInfo *info = nullptr;
auto st = zipptr->Stat( fn, info );
if( !st.IsOK() ) return cb( st, buffer_t() );
if( !st.IsOK() ) return cb( st, 0 );
uint32_t rdsize = info->GetSize();
delete info;
// create a buffer for the data
auto rdbuff = std::make_shared<buffer_t>( rdsize, 0 );
buffer.resize( rdsize );
// issue the read request
XrdCl::Async( XrdCl::ReadFrom( *zipptr, fn, 0, rdsize, rdbuff->data() ) >>
[cb, rdbuff]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
{
cb( st, std::move( *rdbuff ) );
} );
XrdCl::Async( XrdCl::ReadFrom( *zipptr, fn, 0, buffer.size(), buffer.data() ) >>
[cb]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch ) { /*TODO verify integrity!!!*/ cb( st, ch.length ); } );
}

XrdCl::Pipeline ReadMetadata( size_t index )
Expand Down Expand Up @@ -350,10 +449,11 @@ namespace XrdEc
typedef std::unordered_map<std::string, buffer_t> metadata_t;
typedef std::unordered_map<std::string, std::string> urlmap_t;

ObjCfg &objcfg;
dataarchs_t dataarchs; //> map URL to ZipArchive object
metadata_t metadata; //> map URL to CD metadata
urlmap_t urlmap; //> map blknb/strpnb (data chunk) to URL
ObjCfg &objcfg;
dataarchs_t dataarchs; //> map URL to ZipArchive object
metadata_t metadata; //> map URL to CD metadata
urlmap_t urlmap; //> map blknb/strpnb (data chunk) to URL
std::shared_ptr<block_t> block; //>
};

} /* namespace XrdEc */
Expand Down

0 comments on commit 6015396

Please sign in to comment.