Skip to content

Commit

Permalink
[XrdEc] Refactor Reader, part 2.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 11, 2021
1 parent 963fb66 commit b79ec12
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 161 deletions.
4 changes: 3 additions & 1 deletion src/XrdCl/XrdClZipArchive.hh
Expand Up @@ -44,7 +44,7 @@
//-----------------------------------------------------------------------------
// Forward declaration needed for friendship
//-----------------------------------------------------------------------------
namespace XrdEc{ class StrmWriter; class Reader; };
namespace XrdEc{ class StrmWriter; class Reader; template<bool> class OpenOnlyImpl; };

namespace XrdCl
{
Expand All @@ -62,6 +62,8 @@ namespace XrdCl
{
friend class XrdEc::StrmWriter;
friend class XrdEc::Reader;
template<bool>
friend class XrdEc::OpenOnlyImpl;

public:
//-----------------------------------------------------------------------
Expand Down
307 changes: 155 additions & 152 deletions src/XrdEc/XrdEcReader.cc
Expand Up @@ -5,9 +5,15 @@
* Author: simonm
*/

#include "XrdEcReader.hh"
#include "XrdEc/XrdEcReader.hh"
#include "XrdEc/XrdEcUtilities.hh"
#include "XrdEc/XrdEcConfig.hh"
#include "XrdEc/XrdEcObjCfg.hh"

namespace
#include "XrdCl/XrdClParallelOperation.hh"
#include "XrdCl/XrdClZipOperations.hh"

namespace XrdEc
{
//---------------------------------------------------------------------------
// OpenOnly operation (@see ZipOperation) - a private ZIP operation
Expand Down Expand Up @@ -65,10 +71,7 @@ namespace
{
return OpenOnlyImpl<false>( std::move( zip ), std::move( fn ) ).Timeout( timeout );
}
}

namespace XrdEc
{
//-------------------------------------------------------------------------
// A single data block
//-------------------------------------------------------------------------
Expand Down Expand Up @@ -349,7 +352,7 @@ namespace XrdEc
//---------------------------------------------------------------------------
// Open the erasure coded / striped object
//---------------------------------------------------------------------------
void ReaderOpen( XrdCl::ResponseHandler *handler )
void Reader::Open( XrdCl::ResponseHandler *handler )
{
const size_t size = objcfg.plgr.size();
std::vector<XrdCl::Pipeline> opens; opens.reserve( size );
Expand Down Expand Up @@ -445,166 +448,166 @@ namespace XrdEc
length -= rdsize;
usrbuff += rdsize;
}
}

//-------------------------------------------------------------------------
// Read data from given stripes from given block
//-------------------------------------------------------------------------
void Reader::Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb );
{
// generate the file name (blknb/strpnb)
std::string fn = objcfg.GetFileName( blknb, strpnb );
// if the block/stripe does not exist it means we are reading passed the end of the file
auto itr = urlmap.find( fn );
if( itr == urlmap.end() ) return cb( XrdCl::XRootDStatus(), 0 );
// get the URL of the ZIP archive with the respective data
const std::string &url = itr->second;
// get the ZipArchive object
auto &zipptr = dataarchs[url];
// check the size of the data to be read
XrdCl::StatInfo *info = nullptr;
auto st = zipptr->Stat( fn, info );
if( !st.IsOK() ) return cb( st, 0 );
uint32_t rdsize = info->GetSize();
delete info;
// create a buffer for the data
buffer.resize( rdsize );
// issue the read request
XrdCl::Async( XrdCl::ReadFrom( *zipptr, fn, 0, buffer.size(), buffer.data() ) >>
[zipptr, fn, cb]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
//-------------------------------------------------------------------------
// on-definition is not allowed here beforeiven stripes from given block
//-------------------------------------------------------------------------
void Reader::Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb )
{
// generate the file name (blknb/strpnb)
std::string fn = objcfg.GetFileName( blknb, strpnb );
// if the block/stripe does not exist it means we are reading passed the end of the file
auto itr = urlmap.find( fn );
if( itr == urlmap.end() ) return cb( XrdCl::XRootDStatus(), 0 );
// get the URL of the ZIP archive with the respective data
const std::string &url = itr->second;
// get the ZipArchive object
auto &zipptr = dataarchs[url];
// check the size of the data to be read
XrdCl::StatInfo *info = nullptr;
auto st = zipptr->Stat( fn, info );
if( !st.IsOK() ) return cb( st, 0 );
uint32_t rdsize = info->GetSize();
delete info;
// create a buffer for the data
buffer.resize( rdsize );
// issue the read request
XrdCl::Async( XrdCl::ReadFrom( *zipptr, fn, 0, buffer.size(), buffer.data() ) >>
[zipptr, fn, cb]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
{
//---------------------------------------------------
// If read failed there's nothing to do, just pass the
// status to user callback
//---------------------------------------------------
if( !st.IsOK() )
{
//---------------------------------------------------
// If read failed there's nothing to do, just pass the
// status to user callback
//---------------------------------------------------
if( !st.IsOK() )
{
cb( st, 0 );
return;
}
//---------------------------------------------------
// Get the checksum for the read data
//---------------------------------------------------
uint32_t orgcksum = 0;
auto s = zipptr->GetCRC32( fn, orgcksum );
//---------------------------------------------------
// If we cannot extract the checksum assume the data
// are corrupted
//---------------------------------------------------
if( !st.IsOK() )
{
cb( st, 0 );
return;
}
//---------------------------------------------------
// Verify data integrity
//---------------------------------------------------
uint32_t cksum = crc32c( 0, ch.buffer, ch.length );
if( orgcksum != cksum )
{
cb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ), 0 );
return;
}
//---------------------------------------------------
// All is good, we can call now the user callback
//---------------------------------------------------
cb( XrdCl::XRootDStatus(), ch.length );
} );
}
cb( st, 0 );
return;
}
//---------------------------------------------------
// Get the checksum for the read data
//---------------------------------------------------
uint32_t orgcksum = 0;
auto s = zipptr->GetCRC32( fn, orgcksum );
//---------------------------------------------------
// If we cannot extract the checksum assume the data
// are corrupted
//---------------------------------------------------
if( !st.IsOK() )
{
cb( st, 0 );
return;
}
//---------------------------------------------------
// Verify data integrity
//---------------------------------------------------
uint32_t cksum = crc32c( 0, ch.buffer, ch.length );
if( orgcksum != cksum )
{
cb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ), 0 );
return;
}
//---------------------------------------------------
// All is good, we can call now the user callback
//---------------------------------------------------
cb( XrdCl::XRootDStatus(), ch.length );
} );
}

//-----------------------------------------------------------------------
// Read metadata for the object
//-----------------------------------------------------------------------
XrdCl::Pipeline Reader::ReadMetadata( size_t index )
{
const size_t size = objcfg.plgr.size();
// create the File object
auto file = std::make_shared<XrdCl::File>();
// prepare the URL for Open operation
std::string url = objcfg.GetMetadataUrl( index );
// arguments for the Read operation
XrdCl::Fwd<uint32_t> rdsize;
XrdCl::Fwd<void*> rdbuff;

return XrdCl::Open( *file, url, XrdCl::OpenFlags::Read ) >>
[=]( XrdCl::XRootDStatus &st, XrdCl::StatInfo &info )
//-----------------------------------------------------------------------
// Read metadata for the object
//-----------------------------------------------------------------------
XrdCl::Pipeline Reader::ReadMetadata( size_t index )
{
const size_t size = objcfg.plgr.size();
// create the File object
auto file = std::make_shared<XrdCl::File>();
// prepare the URL for Open operation
std::string url = objcfg.GetMetadataUrl( index );
// arguments for the Read operation
XrdCl::Fwd<uint32_t> rdsize;
XrdCl::Fwd<void*> rdbuff;

return XrdCl::Open( *file, url, XrdCl::OpenFlags::Read ) >>
[=]( XrdCl::XRootDStatus &st, XrdCl::StatInfo &info )
{
if( !st.IsOK() )
{
if( !st.IsOK() )
{
if( index + 1 < size )
XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
return;
}
// prepare the args for the subsequent operation
rdsize = info.GetSize();
rdbuff = new char[info.GetSize()];
if( index + 1 < size )
XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
return;
}
| XrdCl::Read( *file, 0, rdsize, rdbuff ) >>
[=]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
// prepare the args for the subsequent operation
rdsize = info.GetSize();
rdbuff = new char[info.GetSize()];
}
| XrdCl::Read( *file, 0, rdsize, rdbuff ) >>
[=]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
{
if( !st.IsOK() )
{
if( !st.IsOK() )
{
if( index + 1 < size )
XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
return;
}
// now parse the metadata
if( !ParseMetadata( ch ) )
{
if( index + 1 < size )
XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
return;
}
if( index + 1 < size )
XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
return;
}
| XrdCl::Final(
[=]( const XrdCl::XRootDStatus& )
// now parse the metadata
if( !ParseMetadata( ch ) )
{
// deallocate the buffer if necessary
if( rdbuff.Valid() )
{
char* buffer = reinterpret_cast<char*>( *rdbuff );
delete[] buffer;
}
// close the file if necessary (we don't really care about the result)
if( file->IsOpen() )
XrdCl::Async( XrdCl::Close( *file ) >> [file]( XrdCl::XRootDStatus& ){ } );
} );
}
if( index + 1 < size )
XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
return;
}
}
| XrdCl::Final(
[=]( const XrdCl::XRootDStatus& )
{
// deallocate the buffer if necessary
if( rdbuff.Valid() )
{
char* buffer = reinterpret_cast<char*>( *rdbuff );
delete[] buffer;
}
// close the file if necessary (we don't really care about the result)
if( file->IsOpen() )
XrdCl::Async( XrdCl::Close( *file ) >> [file]( XrdCl::XRootDStatus& ){ } );
} );
}

//-----------------------------------------------------------------------
// Parse metadata from chunk info object
//-----------------------------------------------------------------------
bool Reader::ParseMetadata( XrdCl::ChunkInfo &ch )
{
const size_t mincnt = objcfg.nbdata + objcfg.nbparity;
const size_t maxcnt = objcfg.plgr.size();
//-----------------------------------------------------------------------
// Parse metadata from chunk info object
//-----------------------------------------------------------------------
bool Reader::ParseMetadata( XrdCl::ChunkInfo &ch )
{
const size_t mincnt = objcfg.nbdata + objcfg.nbparity;
const size_t maxcnt = objcfg.plgr.size();

char *buffer = reinterpret_cast<char*>( ch.buffer );
size_t length = ch.length;
char *buffer = reinterpret_cast<char*>( ch.buffer );
size_t length = ch.length;

for( size_t i = 0; i < maxcnt; ++i )
for( size_t i = 0; i < maxcnt; ++i )
{
uint32_t signature = XrdZip::to<uint32_t>( buffer );
if( signature != XrdZip::LFH::lfhSign )
{
uint32_t signature = XrdZip::to<uint32_t>( buffer );
if( signature != XrdZip::LFH::lfhSign )
{
if( i + 1 < mincnt ) return false;
break;
}
XrdZip::LFH lfh( buffer );
// check if we are not reading passed the end of the buffer
if( lfh.lfhSize + lfh.uncompressedSize > length ) return false;
buffer += lfh.lfhSize;
length -= lfh.lfhSize;
// verify the checksum
uint32_t crc32val = crc32c( 0, buffer, lfh.uncompressedSize );
if( crc32val != lfh.ZCRC32 ) return false;
// keep the metadata
metadata.emplace( lfh.filename, buffer_t( buffer, buffer + lfh.uncompressedSize ) );
buffer += lfh.uncompressedSize;
length -= lfh.uncompressedSize;
if( i + 1 < mincnt ) return false;
break;
}

return true;
XrdZip::LFH lfh( buffer );
// check if we are not reading passed the end of the buffer
if( lfh.lfhSize + lfh.uncompressedSize > length ) return false;
buffer += lfh.lfhSize;
length -= lfh.lfhSize;
// verify the checksum
uint32_t crc32val = crc32c( 0, buffer, lfh.uncompressedSize );
if( crc32val != lfh.ZCRC32 ) return false;
// keep the metadata
metadata.emplace( lfh.filename, buffer_t( buffer, buffer + lfh.uncompressedSize ) );
buffer += lfh.uncompressedSize;
length -= lfh.uncompressedSize;
}

return true;
}

} /* namespace XrdEc */

0 comments on commit b79ec12

Please sign in to comment.