Skip to content

Commit

Permalink
[XrdEc] Add methods to ObjCfg for generating file names/urls.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 8, 2021
1 parent 37d4cf0 commit 085a1c8
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 61 deletions.
10 changes: 1 addition & 9 deletions src/XrdEc/XrdEcConfig.hh
Expand Up @@ -52,19 +52,11 @@ namespace XrdEc
return itr->second;
}

uint8_t maxrelocate;
std::string ckstype;
uint8_t repairthreads;
std::string headnode;

private:

std::unordered_map<std::string, RedundancyProvider> redundancies;

Config() : maxrelocate( 10 ),
ckstype( "crc32" ),
repairthreads( 4 ),
headnode( "eospps.cern.ch" )
Config()
{
}

Expand Down
18 changes: 15 additions & 3 deletions src/XrdEc/XrdEcObjCfg.hh
Expand Up @@ -22,7 +22,6 @@ namespace XrdEc

ObjCfg( const std::string &obj, const std::string &mtindex, uint8_t nbdata, uint8_t nbparity, uint64_t chunksize ) :
obj( obj ),
mtindex( mtindex ),
nbchunks( nbdata + nbparity ),
nbparity( nbparity ),
nbdata( nbdata ),
Expand All @@ -35,7 +34,6 @@ namespace XrdEc
}

ObjCfg( const ObjCfg &objcfg ) : obj( objcfg.obj ),
mtindex( objcfg.mtindex ),
nbchunks( objcfg.nbchunks ),
nbparity( objcfg.nbparity ),
nbdata( objcfg.nbdata ),
Expand All @@ -47,8 +45,22 @@ namespace XrdEc
{
}

inline std::string GetDataUrl( size_t i ) const
{
return plgr[i] + obj + ".data.zip";
}

inline std::string GetMetadataUrl( size_t i ) const
{
return plgr[i] + obj + ".metadata.zip";
}

inline std::string GetFileName( size_t blknb, size_t strpnb ) const
{
return obj + '.' + std::to_string( blknb ) + '.' + std::to_string( strpnb );
}

const std::string obj;
const std::string mtindex; // index of the metadata file
const uint8_t nbchunks; // number of chunks in block
const uint8_t nbparity; // number of chunks in parity
const uint8_t nbdata; // number of chunks in data
Expand Down
6 changes: 3 additions & 3 deletions src/XrdEc/XrdEcReader.hh
Expand Up @@ -383,7 +383,7 @@ namespace XrdEc
for( size_t i = 0; i < size; ++i )
{
// generate the URL
std::string url = objcfg.plgr[i] + objcfg.obj + ".zip";
std::string url = objcfg.GetDataUrl( i );
// create the file object
dataarchs.emplace( url, std::make_shared<XrdCl::ZipArchive>() );
// open the archive
Expand Down Expand Up @@ -473,7 +473,7 @@ namespace XrdEc
void Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb )
{
// generate the file name (blknb/strpnb)
std::string fn = objcfg.obj + '.' + std::to_string( blknb ) + '.' + std::to_string( strpnb );
std::string fn = objcfg.GetFileName( blknb, strpnb );
// if the block/stripe does not exist it means we are reading passed the end of the file
auto itr = urlmap.find( fn );
if( itr == urlmap.end() ) return cb( XrdCl::XRootDStatus(), 0 );
Expand Down Expand Up @@ -538,7 +538,7 @@ namespace XrdEc
// create the File object
auto file = std::make_shared<XrdCl::File>();
// prepare the URL for Open operation
std::string url = objcfg.plgr[index] + objcfg.obj + ".metadata.zip";
std::string url = objcfg.GetMetadataUrl( index );
// arguments for the Read operation
XrdCl::Fwd<uint32_t> rdsize;
XrdCl::Fwd<void*> rdbuff;
Expand Down
42 changes: 1 addition & 41 deletions src/XrdEc/XrdEcStrmWriter.cc
Expand Up @@ -22,45 +22,5 @@ namespace
namespace XrdEc
{

// void StrmWriter::WriteBlock()
// {
// const size_t size = objcfg->nbchunks;
// std::vector<XrdCl::Pipeline> writes;
// writes.reserve( size );
//
// std::vector<size_t> fileid( files->size() );
// std::iota( fileid.begin(), fileid.end(), 0 );
// std::shuffle( fileid.begin(), fileid.end(), random_engine );
//
// std::vector<size_t> spareid;
// auto itr = fileid.begin() + objcfg->nbchunks;
// for( ; itr != fileid.end() ; ++itr )
// spareid.emplace_back( *itr );
// std::shared_ptr<spare_files> spares = std::make_shared<spare_files>();
// spares->spareid.swap( spareid );
//
// for( size_t i = 0; i < size; ++i )
// {
// std::shared_ptr<size_t> fid = std::make_shared( fileid[i] );
// std::string fn = objcfg->obj + '.' + std::to_string( wrtbuff->GetBlkNb() ) + '.' + std::to_string( i );
// uint32_t checksum = checksums[i].get();
// std::shared_ptr<WrtCtx> wrtctx( new WrtCtx( checksum, fn, wrtbuff->GetChunk( i ), wrtbuff->GetStrpSize( i ) ) );
// uint32_t offset = offsets[*fid].fetch_add( wrtctx->total_size );
// uint32_t strpsize = wrtbuff->GetStrpSize( i );
// auto &file = (*files)[*fid];
// auto wrt_handler = [files, wrtctx, fid, dirs, strpsize, checksum, offset, fn]( XrdCl::XRootDStatus &st )
// {
// if( !st.IsOK() ) return;
// // create respective CDH record
// dirs[*fid].Add( fn, strpsize, checksum, offset ); // TODO figure out the right file ID !!!
// };
//
// XrdCl::rcvry_func WrtRcvry = spares->spareid.empty() ? nullptr : WrtRecovery( spares, offset, wrtctx, files );
// writes.emplace_back( XrdCl::WriteV( file, offset, wrtctx->iov, wrtctx->iovcnt ).Recovery( WrtRcvry ) >> wrt_handler );
// }
//
// std::shared_ptr<WrtBuff> pend_buff = std::move( wrtbuff );
// std::shared_ptr<WrtCallback> _wrt_callback = wrt_callback;
// XrdCl::Async( XrdCl::Parallel( writes ) >> [pend_buff, _wrt_callback]( XrdCl::XRootDStatus &st ){ if( !st.IsOK() ) _wrt_callback->Run( st ); } );
// }

}
13 changes: 8 additions & 5 deletions src/XrdEc/XrdEcStrmWriter.hh
Expand Up @@ -132,7 +132,7 @@ namespace XrdEc
public:

//-----------------------------------------------------------------------
// Constructor
//! Constructor
//-----------------------------------------------------------------------
StrmWriter( const ObjCfg &objcfg ) : objcfg( objcfg ),
writer_thread_stop( false ),
Expand All @@ -142,6 +142,9 @@ namespace XrdEc
{
}

//-----------------------------------------------------------------------
//! Destructor
//-----------------------------------------------------------------------
virtual ~StrmWriter()
{
writer_thread_stop = true;
Expand All @@ -161,7 +164,7 @@ namespace XrdEc

for( size_t i = 0; i < size; ++i )
{
std::string url = objcfg.plgr[i] + objcfg.obj + ".zip";
std::string url = objcfg.GetDataUrl( i );
XrdCl::Ctx<XrdCl::ZipArchive> zip( *dataarchs[i] );
opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) );
}
Expand Down Expand Up @@ -380,7 +383,7 @@ namespace XrdEc
uint64_t blksize = 0;
for( size_t strpnb = 0; strpnb < nbchunks; ++strpnb )
{
std::string fn = objcfg.obj + '.' + std::to_string( blknb ) + '.' + std::to_string( strpnb );
std::string fn = objcfg.GetFileName( blknb, strpnb );
uint32_t crc32c = wrtbuff->GetCrc32c( strpnb );
uint64_t strpsize = wrtbuff->GetStrpSize( strpnb );
char* strpbuff = wrtbuff->GetStrpBuff( strpnb );
Expand Down Expand Up @@ -468,7 +471,7 @@ namespace XrdEc
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
for( size_t i = 0; i < cdcnt; ++i )
{
std::string fn = objcfg.plgr[i] + objcfg.obj + ".zip"; // file name (URL of the data archive)
std::string fn = objcfg.GetDataUrl( i ); // file name (URL of the data archive)
buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive)
uint32_t cksum = crc32c( 0, buff.data(), buff.size() ); // crc32c of the buffer
lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer
Expand Down Expand Up @@ -526,7 +529,7 @@ namespace XrdEc
//-------------------------------------------------------------------
// replicate the metadata
//-------------------------------------------------------------------
std::string url = objcfg.plgr[i] + objcfg.obj + ".metadata.zip";
std::string url = objcfg.GetMetadataUrl( i );
metadataarchs.emplace_back( std::make_shared<XrdCl::File>() );
XrdCl::Pipeline p = XrdCl::Open( *metadataarchs[i], url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write )
| XrdCl::Write( *metadataarchs[i], 0, zipbuff->size(), zipbuff->data() )
Expand Down

0 comments on commit 085a1c8

Please sign in to comment.