Skip to content

Commit

Permalink
Compilable first draft of Repair Tool
Browse files Browse the repository at this point in the history
Almost working except for the repair tool being destroyed due to a shared pointer

This version works quite well, but uses mtfile

Working version of the repair tool handles data and metadata corruption
rebased to master

Cleaned code
rebased to master

Last fixes
rebased to master

Restructured and commented repair tool functionality, the opening of archives can either be followed by a host replacement or on its own to detect and report on damaged archives to the user

Added callback for just checking a file's checksum, but has a bug with checksum comparison in reader

Fixed the archive opening and closing and the only instance that checks metadata is now the repair tool

Archive closing waits until everything is written using a pipeline final and condition locking

Fixed: We wait until all writes are finished before closing archives, WriteIntoFile is now a zip operation

Fix: Wait for all replacement archives to be opened

Added a test that corrupts two archives (or one archives in two spots), ran it successfully a couple hundred times

Fixes: Corrupt archives at correct offsets and replaced two usages of uncompressedSize by compressedSize

Fixed the actual root of the issue because corrupted offsets can lead to signflips in minus operations

Small concurrency fix and formatting

Concurrent version of repair

Fixes to make more than nbdata+nbparity hosts possible, changed unit test to use 2 hosts more

Cleaned some code

Further cleanup

Removed more unnecessary code and commented out lines

Added a test to confirm that repair fails when too much is corrupted

Added timeouts to the repair tool and a check in the opening process that checks whether one of the archives was blocked by the user. If it is the case, the tool exits immediately

Adding some timeouts seems to introduce a bug, might have to revert?

Trying to fix segfault for missing host

Fixed an error that the open pipeline would fail if not all readHealth are successful

Fixed the bugs, forgot a handler callback

Changed atomics to relaxed memory order and implemented a block pool to limit the memory consumption

Corrected a rebase mistake

Removed file repair functionality to split it into a second PR
  • Loading branch information
Wuerstchen committed Aug 25, 2022
1 parent e803d26 commit 12551ee
Show file tree
Hide file tree
Showing 19 changed files with 2,094 additions and 513 deletions.
1 change: 1 addition & 0 deletions src/XrdCl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ if( BUILD_XRDEC )
${CMAKE_SOURCE_DIR}/src/XrdEc/XrdEcUtilities.cc
${CMAKE_SOURCE_DIR}/src/XrdEc/XrdEcStrmWriter.cc
${CMAKE_SOURCE_DIR}/src/XrdEc/XrdEcReader.cc
${CMAKE_SOURCE_DIR}/src/XrdEc/XrdEcRepairTool.cc
XrdClEcHandler.cc
)
set( ISAL_LIB isal )
Expand Down
4 changes: 2 additions & 2 deletions src/XrdCl/XrdClFileStateHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -888,10 +888,10 @@ namespace XrdCl
return XRootDStatus( stOK, suAlreadyDone );

if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
return XRootDStatus( stError, errInvalidOp );
return XRootDStatus( stError, errInvalidOp, 0, "OpenInProgress or Recovering");

if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
return XRootDStatus( stError, errInvalidOp );
return XRootDStatus( stError, errInvalidOp, 0, "In the fly not empty");

self->pFileState = CloseInProgress;

Expand Down
9 changes: 7 additions & 2 deletions src/XrdCl/XrdClOperations.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@
#include "XrdCl/XrdClPostMaster.hh"
#include "XrdCl/XrdClDefaultEnv.hh"


namespace XrdCl
{



template<bool HasHndl> class Operation;

class Pipeline;
Expand Down Expand Up @@ -191,6 +194,7 @@ namespace XrdCl
friend class Pipeline;
friend class PipelineHandler;


public:

//------------------------------------------------------------------------
Expand Down Expand Up @@ -263,7 +267,7 @@ namespace XrdCl
XRootDStatus st;
try
{
st = RunImpl( h, timeout );
st = RunImpl( h, timeout );
}
catch( const operation_expired& ex )
{
Expand All @@ -277,7 +281,6 @@ namespace XrdCl
{
st = XRootDStatus( stError, errInternal, 0, ex.what() );
}

if( !st.IsOK() ){
ResponseJob *job = new ResponseJob(h, new XRootDStatus(st), 0, nullptr);
DefaultEnv::GetPostMaster()->GetJobManager()->QueueJob(job);
Expand Down Expand Up @@ -772,6 +775,8 @@ namespace XrdCl
//------------------------------------------------------------------------
uint16_t timeout;
};


}

#endif // __XRD_CL_OPERATIONS_HH__
40 changes: 32 additions & 8 deletions src/XrdCl/XrdClZipArchive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace XrdCl
ResponseHandler *usrHandler,
uint16_t timeout )
{
if( me.openstage != ZipArchive::Done || !me.archive.IsOpen() )
if( me.openstage != ZipArchive::Done || !me.archive.IsOpen() )
return XRootDStatus( stError, errInvalidOp );

Log *log = DefaultEnv::GetLog();
Expand Down Expand Up @@ -80,15 +80,23 @@ namespace XrdCl
filesize = cdfh->extra->compressedSize;
uint16_t descsize = cdfh->HasDataDescriptor() ?
DataDescriptor::GetSize( cdfh->IsZIP64() ) : 0;

if(filesize+descsize > nextRecordOffset)
return XRootDStatus( stError, errInvalidArgs,
errInvalidArgs, "Resulting offset of read smaller than zero." );

uint64_t fileoff = nextRecordOffset - filesize - descsize;
uint64_t offset = fileoff + relativeOffset;
uint64_t uncompressedSize = cdfh->uncompressedSize;
if( uncompressedSize == std::numeric_limits<uint32_t>::max() && cdfh->extra )
uncompressedSize = cdfh->extra->uncompressedSize;

uint64_t sizeTillEnd = relativeOffset > uncompressedSize ?
0 : uncompressedSize - relativeOffset;
if( size > sizeTillEnd ) size = sizeTillEnd;



// if it is a compressed file use ZIP cache to read from the file
if( cdfh->compressionMethod == Z_DEFLATED )
{
Expand Down Expand Up @@ -180,13 +188,14 @@ namespace XrdCl
RSP *rsp = new RSP( relativeOffset, size, usrbuff );
ZipArchive::Schedule( usrHandler, st, rsp );
}

return XRootDStatus();
}

Pipeline p = XrdCl::RdWithRsp<RSP>( me.archive, offset, size, usrbuff ) >>
[=, &me]( XRootDStatus &st, RSP &r )
{
log->Dump( ZipMsg, "[0x%x] Read %d bytes of remote data at "
log->Dump( ZipMsg, "[0x%x] Read %d bytes of remote data at "
"offset %d.", &me, r.GetLength(), r.GetOffset() );
if( usrHandler )
{
Expand All @@ -201,6 +210,7 @@ namespace XrdCl
return XRootDStatus();
}


//---------------------------------------------------------------------------
// Constructor
//---------------------------------------------------------------------------
Expand Down Expand Up @@ -304,7 +314,9 @@ namespace XrdCl
[=]( XRootDStatus &status, ChunkInfo &chunk ) mutable
{
// check the status is OK
if( !status.IsOK() ) return;
if( !status.IsOK() ) {
return;
}

const char *buff = reinterpret_cast<char*>( chunk.buffer );
while( true )
Expand All @@ -317,7 +329,7 @@ namespace XrdCl
const char *eocdBlock = EOCD::Find( buff, chunk.length );
if( !eocdBlock )
{
XRootDStatus error( stError, errDataError, 0,
XRootDStatus error( stError, errDataError, 0,
"End-of-central-directory signature not found." );
Pipeline::Stop( error );
}
Expand Down Expand Up @@ -440,9 +452,11 @@ namespace XrdCl
log->Dump( ZipMsg, "[0x%x] CD records parsed.", this );
uint64_t sumCompSize = 0;
for (auto it = cdvec.begin(); it != cdvec.end(); it++) {
sumCompSize += (*it)->compressedSize;
if ((*it)->offset > archsize || (*it)->offset + (*it)->compressedSize > archsize)
if ((*it)->offset > archsize || (*it)->offset + (*it)->compressedSize > archsize
|| (*it)->offset < sumCompSize)
throw bad_data();

sumCompSize += (*it)->compressedSize;
}
if (sumCompSize > archsize)
throw bad_data();
Expand All @@ -465,6 +479,7 @@ namespace XrdCl
break;
}
}

| XrdCl::Final( [=]( const XRootDStatus &status )
{ // finalize the pipeline by calling the user callback
if( status.IsOK() )
Expand Down Expand Up @@ -635,7 +650,9 @@ namespace XrdCl
[=]( XRootDStatus &st )
{
if( st.IsOK() ) Clear();
else openstage = Error;
else {
openstage = Error;
}
}
| XrdCl::Final( [=]( const XRootDStatus &st ) mutable
{
Expand Down Expand Up @@ -780,13 +797,16 @@ namespace XrdCl
iov[1].iov_base = const_cast<void*>( buffer );
iov[1].iov_len = size;


uint64_t wrtoff = cdoff; // we only support appending
uint32_t wrtlen = iov[0].iov_len + iov[1].iov_len;

Pipeline p;
auto wrthandler = [=]( const XRootDStatus &st ) mutable
{
if( st.IsOK() ) updated = true;
if( st.IsOK() ) {
updated = true;
}
lfhbuf.reset();
if( handler )
handler->HandleResponse( make_status( st ), nullptr );
Expand Down Expand Up @@ -882,6 +902,8 @@ namespace XrdCl
}

log->Dump( ZipMsg, "[0x%x] Appending file: %s.", this, fn.c_str() );


//-------------------------------------------------------------------------
// Create Local File Header record
//-------------------------------------------------------------------------
Expand All @@ -892,4 +914,6 @@ namespace XrdCl
return WriteImpl( size, buffer, handler, timeout );
}



} /* namespace XrdZip */
4 changes: 2 additions & 2 deletions src/XrdCl/XrdClZipArchive.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//-----------------------------------------------------------------------------
// Forward declaration needed for friendship
//-----------------------------------------------------------------------------
namespace XrdEc{ class StrmWriter; class Reader; template<bool> class OpenOnlyImpl; };
namespace XrdEc{ class StrmWriter; class Reader; class RepairTool; template<bool> class OpenOnlyImpl; };
class MicroTest;

namespace XrdCl
Expand All @@ -60,13 +60,13 @@ namespace XrdCl
{
friend class XrdEc::StrmWriter;
friend class XrdEc::Reader;
friend class XrdEc::RepairTool;
template<bool>
friend class XrdEc::OpenOnlyImpl;
friend class ::MicroTest;

template<typename RSP>
friend XRootDStatus ReadFromImpl( ZipArchive&, const std::string&, uint64_t, uint32_t, void*, ResponseHandler*, uint16_t );

public:
//-----------------------------------------------------------------------
//! Constructor
Expand Down
4 changes: 2 additions & 2 deletions src/XrdCl/XrdClZipOperations.hh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "XrdCl/XrdClOperationHandlers.hh"
#include "XrdCl/XrdClCtx.hh"


namespace XrdCl
{

Expand Down Expand Up @@ -286,7 +287,7 @@ namespace XrdCl
//------------------------------------------------------------------------
XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
{
std::string &fn = std::get<FileNameArg>( this->args ).Get();
std::string &fn = std::get<FileNameArg>( this->args ).Get();
uint64_t offset = std::get<OffsetArg>( this->args ).Get();
uint32_t size = std::get<SizeArg>( this->args ).Get();
void *buffer = std::get<BufferArg>( this->args ).Get();
Expand Down Expand Up @@ -426,7 +427,6 @@ namespace XrdCl
std::move( size ), std::move( buffer ) ).Timeout( timeout );
}


//----------------------------------------------------------------------------
//! CloseFile operation (@see ZipOperation)
//----------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions src/XrdEc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_library(
XrdEcThreadPool.hh
XrdEcStrmWriter.hh XrdEcStrmWriter.cc
XrdEcReader.hh XrdEcReader.cc
XrdEcRepairTool.hh XrdEcRepairTool.cc
)

target_link_libraries(
Expand Down
114 changes: 114 additions & 0 deletions src/XrdEc/XrdEcBlkPool.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#ifndef SRC_XRDEC_XRDECBLKPOOL_HH_
#define SRC_XRDEC_XRDECBLKPOOL_HH_

#include "XrdEc/XrdEcUtilities.hh"
#include "XrdEc/XrdEcObjCfg.hh"
#include "XrdEc/XrdEcConfig.hh"
#include "XrdEc/XrdEcThreadPool.hh"
#include "XrdEc/XrdEcReader.hh"

#include <vector>
#include <condition_variable>
#include <mutex>
#include <future>

namespace XrdEc
{
class BlockPool
{
public:

static BlockPool& Instance()
{
static BlockPool instance;
return instance;
}

std::shared_ptr<block_t> Create(ObjCfg &objcfg, Reader &reader, size_t blkid)
{
std::unique_lock<std::mutex> lck(mtx);
//---------------------------------------------------------------------
// If pool is not empty, recycle existing buffer
//---------------------------------------------------------------------
if (!pool.empty())
{
std::shared_ptr<block_t> block(std::move(pool.front()));
pool.pop();

// almost what the block constructor does except we dont make new stripes.
//block->reader = reader;
block->blkid = blkid;
block->state = std::vector<block_t::state_t>(objcfg.nbchunks, block_t::Empty);
block->pending = std::vector<block_t::pending_t>(objcfg.nbchunks);
block->recovering = 0;
block->redirectionIndex = 0;

return std::move(block);
}
//---------------------------------------------------------------------
// Check if we can create a new buffer object without exceeding the
// the maximum size of the pool
//---------------------------------------------------------------------
if (currentsize < totalsize)
{
std::shared_ptr<block_t> block = std::make_shared<block_t>(blkid, reader, objcfg);
++currentsize;
return std::move(block);
}
//---------------------------------------------------------------------
// If not, we have to wait until there is a buffer we can recycle
//---------------------------------------------------------------------
while (pool.empty())
cv.wait(lck);
std::shared_ptr<block_t> block(std::move(pool.front()));
pool.pop();

// almost what the block constructor does except we dont make new stripes.
//block->reader = reader;
block->blkid = blkid;
block->state = std::vector<block_t::state_t>(objcfg.nbchunks,
block_t::Empty);
block->pending = std::vector<block_t::pending_t>(objcfg.nbchunks);
block->recovering = 0;
block->redirectionIndex = 0;

return std::move(block);
}

//-----------------------------------------------------------------------
//! Give back a buffer to the poool
//-----------------------------------------------------------------------
void Recycle(std::shared_ptr<block_t> block)
{
//if (block invalid)
// return;
std::unique_lock<std::mutex> lck(mtx);

pool.emplace(std::move(block));
cv.notify_all();
}

private:

//-----------------------------------------------------------------------
// Default constructor
//-----------------------------------------------------------------------
BlockPool() :
totalsize(1024), currentsize(0)
{
}

BlockPool(const BlockPool&) = delete; //< Copy constructor
BlockPool(BlockPool&&) = delete; //< Move constructor
BlockPool& operator=(const BlockPool&) = delete; //< Copy assigment operator
BlockPool& operator=(BlockPool&&) = delete; //< Move assigment operator

const size_t totalsize; //< maximum size of the pool
size_t currentsize; //< current size of the pool
std::condition_variable cv;
std::mutex mtx;
std::queue<std::shared_ptr<block_t>> pool; //< the pool itself
};
}

#endif /* SRC_XRDEC_XRDECBLKPOOL_HH_ */

0 comments on commit 12551ee

Please sign in to comment.