Skip to content

Commit

Permalink
Duplicated the read method to use either shared pointer or reference …
Browse files Browse the repository at this point in the history
…for the buffer
  • Loading branch information
Wuerstchen committed Aug 25, 2022
1 parent 30236db commit 99987d6
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 1 deletion.
83 changes: 82 additions & 1 deletion src/XrdEc/XrdEcRepairTool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ bool RepairTool::error_correction( std::shared_ptr<block_t> &self, RepairTool *w
{
size_t strpid = i++;
if( self->state[strpid] != block_t::Empty ) continue;
self->reader.Read( self->blkid, strpid, self->stripes[strpid],
writer->Read( self->blkid, strpid, self->stripes[strpid],
RepairTool::update_callback( self, writer, strpid, timeout ) ,timeout);
self->state[strpid] = block_t::Loading;
++loadingcnt;
Expand Down Expand Up @@ -1193,6 +1193,87 @@ void RepairTool::Read( size_t blknb, size_t strpnb, std::shared_ptr<buffer_t> bu
}, timeout );
}

void RepairTool::Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb, uint16_t timeout )
{
// generate the file name (blknb/strpnb)
std::string fn = objcfg.GetFileName( blknb, strpnb );
// if the block/stripe does not exist it means we are reading passed the end of the file
auto itr = urlmap.find( fn );
if( itr == urlmap.end() )
{
auto st = !IsMissing( fn ) ? XrdCl::XRootDStatus() :
XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotFound );
ThreadPool::Instance().Execute( cb, st, 0 );
return;
}
// get the URL of the ZIP archive with the respective data
const std::string &url = itr->second;

if(redirectionMap.find(url) != redirectionMap.end()){
auto st = XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errRedirect);
ThreadPool::Instance().Execute(cb, st, 0);
return;
}

// get the ZipArchive object
auto &zipptr = readDataarchs[url];
// check the size of the data to be read
XrdCl::StatInfo *info = nullptr;
auto st = zipptr->Stat( fn, info );
if( !st.IsOK() )
{
ThreadPool::Instance().Execute( cb, st, 0 );
return;
}
uint32_t rdsize = info->GetSize();
delete info;
// create a buffer for the data
buffer.resize( objcfg.chunksize );
// issue the read request
XrdCl::Async( XrdCl::ReadFrom( *zipptr, fn, 0, rdsize, buffer.data() ) >>
[zipptr, fn, cb, &buffer, this, url, timeout]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
{
//---------------------------------------------------
// If read failed there's nothing to do, just pass the
// status to user callback
//---------------------------------------------------
if( !st.IsOK() )
{
cb( XrdCl::XRootDStatus(st.status, "Read failed"), 0 );
return;
}
//---------------------------------------------------
// Get the checksum for the read data
//---------------------------------------------------
uint32_t orgcksum = 0;
auto s = zipptr->GetCRC32(fn, orgcksum);
//---------------------------------------------------
// If we cannot extract the checksum assume the data
// are corrupted
//---------------------------------------------------
if( !s.IsOK() )
{
cb( XrdCl::XRootDStatus(s.status, s.code, s.errNo, "Chksum fail"), 0 );
return;
}
//---------------------------------------------------
// Verify data integrity
//---------------------------------------------------
uint32_t cksum = objcfg.digest( 0, ch.buffer, ch.length );
if( orgcksum != cksum )
{
cb( XrdCl::XRootDStatus( XrdCl::stError, "Chksum unequal" ), 0 );
return;
}
//---------------------------------------------------
// All is good, we can call now the user callback
//---------------------------------------------------
cb(XrdCl::XRootDStatus(), ch.length);
return;

}, timeout );
}

//-----------------------------------------------------------------------
//! Check if chunk file name is missing
//-----------------------------------------------------------------------
Expand Down
10 changes: 10 additions & 0 deletions src/XrdEc/XrdEcRepairTool.hh
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ private:
* @param exactControl
*/
void Read( size_t blknb, size_t strpnb, std::shared_ptr<buffer_t> buffer, callback_t cb, uint16_t timeout = 0);
/**
* Initiates the actual read from disk, calls update_callback afterwards
* @param blknb
* @param strpnb
* @param buffer
* @param cb
* @param timeout
* @param exactControl
*/
void Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb, uint16_t timeout = 0);
/**
* Sets the state of the stripe we read to okay or missing and calls error correction again.
* @param self
Expand Down

0 comments on commit 99987d6

Please sign in to comment.