Skip to content

Commit

Permalink
Merge pull request #1775 from Nikipedia/ScanPlusRepair
Browse files Browse the repository at this point in the history
Add repair tool
  • Loading branch information
simonmichal committed Aug 31, 2022
2 parents 2f47c8e + 19b1ba4 commit 5ef15e7
Show file tree
Hide file tree
Showing 6 changed files with 752 additions and 11 deletions.
90 changes: 90 additions & 0 deletions src/XrdCl/XrdClZipArchive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,96 @@ namespace XrdCl
return ReadFromImpl<ChunkInfo>( *this, fn, offset, size, buffer, handler, timeout );
}

/*
* Writes a buffer into the file at an offset. Not tested with compressed archives.
*/
XRootDStatus ZipArchive::WriteIntoFile(const std::string &fn, uint64_t relativeOffset,
uint32_t size, uint32_t chksum, const void *usrbuff, ResponseHandler *handler,
uint16_t timeout) {
if (openstage != ZipArchive::Done || !archive.IsOpen())
return XRootDStatus(stError, errInvalidOp);

Log *log = DefaultEnv::GetLog();

auto cditr = cdmap.find(fn);
if (cditr == cdmap.end())
return XRootDStatus(stError, errNotFound, errNotFound,
"File not found.");

CDFH *cdfh = cdvec[cditr->second].get();

// check if the file is compressed, for now we only support uncompressed and inflate/deflate compression
if (cdfh->compressionMethod != 0 && cdfh->compressionMethod != Z_DEFLATED)
return XRootDStatus(stError, errNotSupported, 0,
"The compression algorithm is not supported!");

uint64_t offset = CDFH::GetOffset(*cdfh);

LFH lfh(fn, chksum, size, time(0));


std::shared_ptr<buffer_t> lfhbuf;
uint32_t lfhlen = lfh.lfhSize;
lfhbuf = std::make_shared<buffer_t>();
lfhbuf->reserve(size+lfhlen);
lfh.Serialize(*lfhbuf);
auto usrchars = (char*)usrbuff;
for(uint32_t u = 0; u < size; u++){
lfhbuf->push_back(usrchars[u]);
}
size += lfhlen;



// if it is a compressed file use ZIP cache to read from the file
if (cdfh->compressionMethod == Z_DEFLATED) {

// issue remote write
if (relativeOffset > cdfh->compressedSize)
return XRootDStatus(); // there's nothing to do,
// we already have all the data locally
uint32_t wrsize = size;
// check if this is the last read (we reached the end of
// file from user perspective)
if (relativeOffset + size >= cdfh->uncompressedSize) {
// if yes, make sure we readout all the compressed data
// Note: In a patological case the compressed size may
// be greater than the uncompressed size
wrsize =
cdfh->compressedSize > relativeOffset ?
cdfh->compressedSize - relativeOffset : 0;
}
// make sure we are not reading past the end of
// compressed data
if (relativeOffset + size > cdfh->compressedSize)
wrsize = cdfh->compressedSize - relativeOffset;

// now write the data ...
Pipeline p =
XrdCl::Write(archive, offset, wrsize, usrbuff)
>> [=](XRootDStatus &st) {
Log *log = DefaultEnv::GetLog();
log->Dump(ZipMsg,
"Wrote bytes to remote data.");
};
Async(std::move(p), timeout);

return XRootDStatus();
}

Pipeline p = XrdCl::Write(archive, offset, size, lfhbuf->data())
>> [=](XRootDStatus &st) mutable {
log->Dump( ZipMsg, "Wrote bytes to remote data");
if (handler) {
XRootDStatus *status = ZipArchive::make_status(st);
handler->HandleResponse(status, nullptr);
}
lfhbuf.reset();
};
Async(std::move(p), timeout);
return XRootDStatus();
}

//---------------------------------------------------------------------------
// PgRead data from a given file
//---------------------------------------------------------------------------
Expand Down
19 changes: 19 additions & 0 deletions src/XrdCl/XrdClZipArchive.hh
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,25 @@ namespace XrdCl
ResponseHandler *handler,
uint16_t timeout = 0 );

//-----------------------------------------------------------------------
//! Overwrite data in a given file
//!
//! @param fn : the name of the file into which we are going to write
//! @param offset : offset within the file to write at
//! @param size : number of bytes to be written
//! @param buffer : the buffer for the data
//! @param handler : user callback
//! @param timeout : operation timeout
//! @return : the status of the operation
//-----------------------------------------------------------------------
XRootDStatus WriteIntoFile( const std::string &fn,
uint64_t offset,
uint32_t size,
uint32_t chksum,
const void *buffer,
ResponseHandler *handler,
uint16_t timeout = 0 );

//-----------------------------------------------------------------------
//! PgRead data from a given file
//!
Expand Down
62 changes: 62 additions & 0 deletions src/XrdCl/XrdClZipOperations.hh
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,68 @@ namespace XrdCl
std::move( size ), std::move( buffer ) ).Timeout( timeout );
}

//----------------------------------------------------------------------------
//! WriteIntoFile operation (@see ZipOperation)
//----------------------------------------------------------------------------
template<bool HasHndl>
class WriteIntoFileImpl: public ZipOperation<WriteIntoFileImpl, HasHndl, Resp<void>,
Arg<std::string>, Arg<uint64_t>, Arg<uint32_t>, Arg<uint32_t>, Arg<const void*>>
{
public:

//------------------------------------------------------------------------
//! Inherit constructors from FileOperation (@see FileOperation)
//------------------------------------------------------------------------
using ZipOperation<WriteIntoFileImpl, HasHndl, Resp<void>, Arg<std::string>,
Arg<uint64_t>,Arg<uint32_t>, Arg<uint32_t>, Arg<const void*>>::ZipOperation;

//------------------------------------------------------------------------
//! Argument indexes in the args tuple
//------------------------------------------------------------------------
enum { FnArg, OffsetArg, SizeArg, CrcArg, BufferArg };

//------------------------------------------------------------------------
//! @return : name of the operation (@see Operation)
//------------------------------------------------------------------------
std::string ToString()
{
return "WriteIntoFile";
}

protected:

//------------------------------------------------------------------------
//! RunImpl operation (@see Operation)
//!
//! @param params : container with parameters forwarded from
//! previous operation
//! @return : status of the operation
//------------------------------------------------------------------------
XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
{
std::string &fn = std::get<FnArg>( this->args ).Get();
uint64_t offset = std::get<OffsetArg>(this->args).Get();
uint32_t crc32 = std::get<CrcArg>( this->args ).Get();
uint32_t size = std::get<SizeArg>( this->args ).Get();
const void *buffer = std::get<BufferArg>( this->args ).Get();
uint16_t timeout = pipelineTimeout < this->timeout ?
pipelineTimeout : this->timeout;
return this->zip->WriteIntoFile( fn, offset, size, crc32, buffer, handler, timeout );
}
};

//----------------------------------------------------------------------------
//! Factory for creating ArchiveReadImpl objects
//----------------------------------------------------------------------------
inline WriteIntoFileImpl<false> WriteIntoFile( Ctx<ZipArchive> zip, Arg<std::string> fn,
Arg<uint64_t> offset,
Arg<uint32_t> size, Arg<uint32_t> crc32,
Arg<const void*> buffer, uint16_t timeout = 0 )
{
return WriteIntoFileImpl<false>( std::move( zip ), std::move( fn ), std::move(offset), std::move( size ),
std::move( crc32 ), std::move( buffer ) ).Timeout( timeout );
}


//----------------------------------------------------------------------------
//! CloseFile operation (@see ZipOperation)
Expand Down

0 comments on commit 5ef15e7

Please sign in to comment.